docker/swarmkit

View on GitHub
manager/scheduler/filter.go

Summary

Maintainability
A
1 hr
Test Coverage
package scheduler

import (
    "fmt"
    "strings"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/api/genericresource"
    "github.com/moby/swarmkit/v2/manager/constraint"
)

// Filter checks whether the given task can run on the given node.
// A filter may only operate
type Filter interface {
    // SetTask returns true when the filter is enabled for a given task
    // and assigns the task to the filter. It returns false if the filter
    // isn't applicable to this task.  For instance, a constraints filter
    // would return `false` if the task doesn't contain any constraints.
    SetTask(*api.Task) bool

    // Check returns true if the task assigned by SetTask can be scheduled
    // into the given node. This function should not be called if SetTask
    // returned false.
    Check(*NodeInfo) bool

    // Explain what a failure of this filter means
    Explain(nodes int) string
}

// ReadyFilter checks that the node is ready to schedule tasks.
type ReadyFilter struct {
}

// SetTask returns true when the filter is enabled for a given task.
func (f *ReadyFilter) SetTask(_ *api.Task) bool {
    return true
}

// Check returns true if the task can be scheduled into the given node.
func (f *ReadyFilter) Check(n *NodeInfo) bool {
    return n.Status.State == api.NodeStatus_READY &&
        n.Spec.Availability == api.NodeAvailabilityActive
}

// Explain returns an explanation of a failure.
func (f *ReadyFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "1 node not available for new tasks"
    }
    return fmt.Sprintf("%d nodes not available for new tasks", nodes)
}

// ResourceFilter checks that the node has enough resources available to run
// the task.
type ResourceFilter struct {
    reservations *api.Resources
}

// SetTask returns true when the filter is enabled for a given task.
func (f *ResourceFilter) SetTask(t *api.Task) bool {
    r := t.Spec.Resources
    if r == nil || r.Reservations == nil {
        return false
    }

    res := r.Reservations
    if res.NanoCPUs == 0 && res.MemoryBytes == 0 && len(res.Generic) == 0 {
        return false
    }

    f.reservations = r.Reservations
    return true
}

// Check returns true if the task can be scheduled into the given node.
func (f *ResourceFilter) Check(n *NodeInfo) bool {
    if f.reservations.NanoCPUs > n.AvailableResources.NanoCPUs {
        return false
    }

    if f.reservations.MemoryBytes > n.AvailableResources.MemoryBytes {
        return false
    }

    for _, v := range f.reservations.Generic {
        enough, err := genericresource.HasEnough(n.AvailableResources.Generic, v)
        if err != nil || !enough {
            return false
        }
    }

    return true
}

// Explain returns an explanation of a failure.
func (f *ResourceFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "insufficient resources on 1 node"
    }
    return fmt.Sprintf("insufficient resources on %d nodes", nodes)
}

// PluginFilter checks that the node has a specific volume plugin installed
type PluginFilter struct {
    t *api.Task
}

func referencesVolumePlugin(mount api.Mount) bool {
    return mount.Type == api.MountTypeVolume &&
        mount.VolumeOptions != nil &&
        mount.VolumeOptions.DriverConfig != nil &&
        mount.VolumeOptions.DriverConfig.Name != "" &&
        mount.VolumeOptions.DriverConfig.Name != "local"

}

// SetTask returns true when the filter is enabled for a given task.
func (f *PluginFilter) SetTask(t *api.Task) bool {
    c := t.Spec.GetContainer()

    var volumeTemplates bool
    if c != nil {
        for _, mount := range c.Mounts {
            if referencesVolumePlugin(mount) {
                volumeTemplates = true
                break
            }
        }
    }

    if (c != nil && volumeTemplates) || len(t.Networks) > 0 || t.Spec.LogDriver != nil {
        f.t = t
        return true
    }

    return false
}

// Check returns true if the task can be scheduled into the given node.
// TODO(amitshukla): investigate storing Plugins as a map so it can be easily probed
func (f *PluginFilter) Check(n *NodeInfo) bool {
    if n.Description == nil || n.Description.Engine == nil {
        // If the node is not running Engine, plugins are not
        // supported.
        return true
    }

    // Get list of plugins on the node
    nodePlugins := n.Description.Engine.Plugins

    // Check if all volume plugins required by task are installed on node
    container := f.t.Spec.GetContainer()
    if container != nil {
        for _, mount := range container.Mounts {
            if referencesVolumePlugin(mount) {
                if _, exists := f.pluginExistsOnNode("Volume", mount.VolumeOptions.DriverConfig.Name, nodePlugins); !exists {
                    return false
                }
            }
        }
    }

    // Check if all network plugins required by task are installed on node
    for _, tn := range f.t.Networks {
        if tn.Network != nil && tn.Network.DriverState != nil && tn.Network.DriverState.Name != "" {
            if _, exists := f.pluginExistsOnNode("Network", tn.Network.DriverState.Name, nodePlugins); !exists {
                return false
            }
        }
    }

    // It's possible that the LogDriver object does not carry a name, just some
    // configuration options. In that case, the plugin filter shouldn't fail to
    // schedule the task
    if f.t.Spec.LogDriver != nil && f.t.Spec.LogDriver.Name != "none" && f.t.Spec.LogDriver.Name != "" {
        // If there are no log driver types in the list at all, most likely this is
        // an older daemon that did not report this information. In this case don't filter
        if typeFound, exists := f.pluginExistsOnNode("Log", f.t.Spec.LogDriver.Name, nodePlugins); !exists && typeFound {
            return false
        }
    }
    return true
}

// pluginExistsOnNode returns true if the (pluginName, pluginType) pair is present in nodePlugins
func (f *PluginFilter) pluginExistsOnNode(pluginType string, pluginName string, nodePlugins []api.PluginDescription) (bool, bool) {
    var typeFound bool

    for _, np := range nodePlugins {
        if pluginType != np.Type {
            continue
        }
        typeFound = true

        if pluginName == np.Name {
            return true, true
        }
        // This does not use the reference package to avoid the
        // overhead of parsing references as part of the scheduling
        // loop. This is okay only because plugin names are a very
        // strict subset of the reference grammar that is always
        // name:tag.
        if strings.HasPrefix(np.Name, pluginName) && np.Name[len(pluginName):] == ":latest" {
            return true, true
        }
    }
    return typeFound, false
}

// Explain returns an explanation of a failure.
func (f *PluginFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "missing plugin on 1 node"
    }
    return fmt.Sprintf("missing plugin on %d nodes", nodes)
}

// ConstraintFilter selects only nodes that match certain labels.
type ConstraintFilter struct {
    constraints []constraint.Constraint
}

// SetTask returns true when the filter is enable for a given task.
func (f *ConstraintFilter) SetTask(t *api.Task) bool {
    if t.Spec.Placement == nil || len(t.Spec.Placement.Constraints) == 0 {
        return false
    }

    constraints, err := constraint.Parse(t.Spec.Placement.Constraints)
    if err != nil {
        // constraints have been validated at controlapi
        // if in any case it finds an error here, treat this task
        // as constraint filter disabled.
        return false
    }
    f.constraints = constraints
    return true
}

// Check returns true if the task's constraint is supported by the given node.
func (f *ConstraintFilter) Check(n *NodeInfo) bool {
    return constraint.NodeMatches(f.constraints, n.Node)
}

// Explain returns an explanation of a failure.
func (f *ConstraintFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "scheduling constraints not satisfied on 1 node"
    }
    return fmt.Sprintf("scheduling constraints not satisfied on %d nodes", nodes)
}

// PlatformFilter selects only nodes that run the required platform.
type PlatformFilter struct {
    supportedPlatforms []*api.Platform
}

// SetTask returns true when the filter is enabled for a given task.
func (f *PlatformFilter) SetTask(t *api.Task) bool {
    placement := t.Spec.Placement
    if placement != nil {
        // copy the platform information
        f.supportedPlatforms = placement.Platforms
        if len(placement.Platforms) > 0 {
            return true
        }
    }
    return false
}

// Check returns true if the task can be scheduled into the given node.
func (f *PlatformFilter) Check(n *NodeInfo) bool {
    // if the supportedPlatforms field is empty, then either it wasn't
    // provided or there are no constraints
    if len(f.supportedPlatforms) == 0 {
        return true
    }
    // check if the platform for the node is supported
    if n.Description != nil {
        if nodePlatform := n.Description.Platform; nodePlatform != nil {
            for _, p := range f.supportedPlatforms {
                if f.platformEqual(*p, *nodePlatform) {
                    return true
                }
            }
        }
    }
    return false
}

func (f *PlatformFilter) platformEqual(imgPlatform, nodePlatform api.Platform) bool {
    // normalize "x86_64" architectures to "amd64"
    if imgPlatform.Architecture == "x86_64" {
        imgPlatform.Architecture = "amd64"
    }
    if nodePlatform.Architecture == "x86_64" {
        nodePlatform.Architecture = "amd64"
    }

    // normalize "aarch64" architectures to "arm64"
    if imgPlatform.Architecture == "aarch64" {
        imgPlatform.Architecture = "arm64"
    }
    if nodePlatform.Architecture == "aarch64" {
        nodePlatform.Architecture = "arm64"
    }

    if (imgPlatform.Architecture == "" || imgPlatform.Architecture == nodePlatform.Architecture) && (imgPlatform.OS == "" || imgPlatform.OS == nodePlatform.OS) {
        return true
    }
    return false
}

// Explain returns an explanation of a failure.
func (f *PlatformFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "unsupported platform on 1 node"
    }
    return fmt.Sprintf("unsupported platform on %d nodes", nodes)
}

// HostPortFilter checks that the node has a specific port available.
type HostPortFilter struct {
    t *api.Task
}

// SetTask returns true when the filter is enabled for a given task.
func (f *HostPortFilter) SetTask(t *api.Task) bool {
    if t.Endpoint != nil {
        for _, port := range t.Endpoint.Ports {
            if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
                f.t = t
                return true
            }
        }
    }

    return false
}

// Check returns true if the task can be scheduled into the given node.
func (f *HostPortFilter) Check(n *NodeInfo) bool {
    for _, port := range f.t.Endpoint.Ports {
        if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
            portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
            if _, ok := n.usedHostPorts[portSpec]; ok {
                return false
            }
        }
    }

    return true
}

// Explain returns an explanation of a failure.
func (f *HostPortFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "host-mode port already in use on 1 node"
    }
    return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}

// MaxReplicasFilter selects only nodes that does not exceed max replicas per node.
type MaxReplicasFilter struct {
    t *api.Task
}

// SetTask returns true when max replicas per node filter > 0 for a given task.
func (f *MaxReplicasFilter) SetTask(t *api.Task) bool {
    if t.Spec.Placement != nil && t.Spec.Placement.MaxReplicas > 0 {
        f.t = t
        return true
    }

    return false
}

// Check returns true if there is less active (assigned or pre-assigned) tasks for this service on current node than set to MaxReplicas limit
func (f *MaxReplicasFilter) Check(n *NodeInfo) bool {
    return uint64(n.ActiveTasksCountByService[f.t.ServiceID]) < f.t.Spec.Placement.MaxReplicas
}

// Explain returns an explanation of a failure.
func (f *MaxReplicasFilter) Explain(nodes int) string {
    return "max replicas per node limit exceed"
}

type VolumesFilter struct {
    vs *volumeSet
    t  *api.Task

    // requestedVolumes is a set of volumes requested by the task. This can
    // include either volume names or volume groups. Volume groups, as in the
    // Mount.Source field, are prefixed with "group:"
    requestedVolumes []*api.Mount
}

func (f *VolumesFilter) SetTask(t *api.Task) bool {
    // if there is no volume Manager, skip this filter always
    if f.vs == nil {
        return false
    }
    f.t = t
    // reset requestedVolumes every time we set a task, so we don't
    // accidentally append to the last task's set of requested volumes.
    f.requestedVolumes = []*api.Mount{}

    // t should never be nil, but we should ensure that it is not just in case
    // we make mistakes in the future.
    if t == nil {
        return false
    }

    c := t.Spec.GetContainer()
    if c == nil {
        return false
    }

    // hasCSI will be set true if one of the mounts is a CSI-type mount.
    hasCSI := false
    for _, mount := range c.Mounts {
        if mount.Type == api.MountTypeCluster {
            hasCSI = true
            f.requestedVolumes = append(f.requestedVolumes, &mount)
        }
    }
    return hasCSI
}

func (f *VolumesFilter) Check(nodeInfo *NodeInfo) bool {
    for _, mount := range f.requestedVolumes {
        if f.vs.isVolumeAvailableOnNode(mount, nodeInfo) != "" {
            return true
        }
    }

    return false
}

func (f *VolumesFilter) Explain(nodes int) string {
    if nodes == 1 {
        return "cannot fulfill requested CSI volume mounts on 1 node"
    }
    return fmt.Sprintf(
        "cannot fulfill requested CSI volume mounts on %d nodes", nodes,
    )
}