docker/swarmkit

View on GitHub
manager/scheduler/volumes.go

Summary

Maintainability
A
1 hr
Test Coverage
package scheduler

import (
    "fmt"
    "strings"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/manager/state/store"
)

// the scheduler package does double duty -- in addition to choosing nodes, it
// must also choose volumes. this is because volumes are fungible, and can be
// scheduled to several nodes, and used by several tasks. we should endeavor to
// spread tasks across volumes, like we spread nodes. on the positive side,
// unlike nodes, volumes are not heirarchical. that is, we don't need to
// spread across multiple levels of a tree, only a flat set.

// volumeSet is the set of all volumes currently managed
type volumeSet struct {
    // volumes is a mapping of volume IDs to volumeInfo
    volumes map[string]volumeInfo
    // byGroup is a mapping from a volume group name to a set of volumes in
    // that group
    byGroup map[string]map[string]struct{}
    // byName is a mapping of volume names to swarmkit volume IDs.
    byName map[string]string
}

// volumeUsage contains information about the usage of a Volume by a specific
// task.
type volumeUsage struct {
    nodeID   string
    readOnly bool
}

// volumeInfo contains scheduler information about a given volume
type volumeInfo struct {
    volume *api.Volume
    tasks  map[string]volumeUsage
    // nodes is a set of nodes a volume is in use on. it maps a node ID to a
    // reference count for how many tasks are using the volume on that node.
    nodes map[string]int
}

func newVolumeSet() *volumeSet {
    return &volumeSet{
        volumes: map[string]volumeInfo{},
        byGroup: map[string]map[string]struct{}{},
        byName:  map[string]string{},
    }
}

// getVolume returns the volume object for the given ID as stored in the
// volumeSet, or nil if none exists.
//
//nolint:unused // TODO(thaJeztah) this is currently unused: is it safe to remove?
func (vs *volumeSet) getVolume(id string) *api.Volume {
    return vs.volumes[id].volume
}

func (vs *volumeSet) addOrUpdateVolume(v *api.Volume) {
    if info, ok := vs.volumes[v.ID]; !ok {
        vs.volumes[v.ID] = volumeInfo{
            volume: v,
            nodes:  map[string]int{},
            tasks:  map[string]volumeUsage{},
        }
    } else {
        // if the volume already exists in the set, then only update the volume
        // object, not the tasks map.
        info.volume = v
    }

    if set, ok := vs.byGroup[v.Spec.Group]; ok {
        set[v.ID] = struct{}{}
    } else {
        vs.byGroup[v.Spec.Group] = map[string]struct{}{v.ID: {}}
    }
    vs.byName[v.Spec.Annotations.Name] = v.ID
}

//nolint:unused // only used in tests.
func (vs *volumeSet) removeVolume(volumeID string) {
    if info, ok := vs.volumes[volumeID]; ok {
        // if the volume exists in the set, look up its group ID and remove it
        // from the byGroup mapping as well
        group := info.volume.Spec.Group
        delete(vs.byGroup[group], volumeID)
        delete(vs.volumes, volumeID)
        delete(vs.byName, info.volume.Spec.Annotations.Name)
    }
}

// chooseTaskVolumes selects a set of VolumeAttachments for the task on the
// given node. it expects that the node was already validated to have the
// necessary volumes, but it will return an error if a full set of volumes is
// not available.
func (vs *volumeSet) chooseTaskVolumes(task *api.Task, nodeInfo *NodeInfo) ([]*api.VolumeAttachment, error) {
    volumes := []*api.VolumeAttachment{}

    // we'll reserve volumes in this loop, but release all of our reservations
    // before we finish. the caller will need to call reserveTaskVolumes after
    // calling this function
    // TODO(dperny): this is probably not optimal
    defer func() {
        for _, volume := range volumes {
            vs.releaseVolume(volume.ID, task.ID)
        }
    }()

    // TODO(dperny): handle non-container tasks
    c := task.Spec.GetContainer()
    if c == nil {
        return nil, nil
    }
    for _, mount := range task.Spec.GetContainer().Mounts {
        if mount.Type == api.MountTypeCluster {
            candidate := vs.isVolumeAvailableOnNode(&mount, nodeInfo)
            if candidate == "" {
                // TODO(dperny): return structured error types, instead of
                // error strings
                return nil, fmt.Errorf("cannot find volume to satisfy mount with source %v", mount.Source)
            }
            vs.reserveVolume(candidate, task.ID, nodeInfo.Node.ID, mount.ReadOnly)
            volumes = append(volumes, &api.VolumeAttachment{
                ID:     candidate,
                Source: mount.Source,
                Target: mount.Target,
            })
        }
    }

    return volumes, nil
}

// reserveTaskVolumes identifies all volumes currently in use on a task and
// marks them in the volumeSet as in use.
func (vs *volumeSet) reserveTaskVolumes(task *api.Task) {
    for _, va := range task.Volumes {
        // we shouldn't need to handle non-container tasks because those tasks
        // won't have any entries in task.Volumes.
        for _, mount := range task.Spec.GetContainer().Mounts {
            if mount.Source == va.Source && mount.Target == va.Target {
                vs.reserveVolume(va.ID, task.ID, task.NodeID, mount.ReadOnly)
            }
        }
    }
}

func (vs *volumeSet) reserveVolume(volumeID, taskID, nodeID string, readOnly bool) {
    info, ok := vs.volumes[volumeID]
    if !ok {
        // TODO(dperny): don't just return nothing.
        return
    }

    info.tasks[taskID] = volumeUsage{nodeID: nodeID, readOnly: readOnly}
    // increment the reference count for this node.
    info.nodes[nodeID] = info.nodes[nodeID] + 1
}

func (vs *volumeSet) releaseVolume(volumeID, taskID string) {
    info, ok := vs.volumes[volumeID]
    if !ok {
        // if the volume isn't in the set, no action to take.
        return
    }

    // decrement the reference count for this task's node
    usage, ok := info.tasks[taskID]
    if ok {
        // this is probably an unnecessarily high level of caution, but make
        // sure we don't go below zero on node count.
        if c := info.nodes[usage.nodeID]; c > 0 {
            info.nodes[usage.nodeID] = c - 1
        }
        delete(info.tasks, taskID)
    }
}

// freeVolumes finds volumes that are no longer in use on some nodes, and
// updates them to be unpublished from those nodes.
//
// TODO(dperny): this is messy and has a lot of overhead. it should be reworked
// to something more streamlined.
func (vs *volumeSet) freeVolumes(batch *store.Batch) error {
    for volumeID, info := range vs.volumes {
        if err := batch.Update(func(tx store.Tx) error {
            v := store.GetVolume(tx, volumeID)
            if v == nil {
                return nil
            }

            // when we are freeing a volume, we may update more than one of the
            // volume's PublishStatuses. this means we can't simply put the
            // Update call inside of the if statement; we need to know if we've
            // changed anything once we've checked *all* of the statuses.
            changed := false
            for _, status := range v.PublishStatus {
                if info.nodes[status.NodeID] == 0 && status.State == api.VolumePublishStatus_PUBLISHED {
                    status.State = api.VolumePublishStatus_PENDING_NODE_UNPUBLISH
                    changed = true
                }
            }
            if changed {
                if err := store.UpdateVolume(tx, v); err != nil {
                    return err
                }
            }
            return nil
        }); err != nil {
            return err
        }
    }
    return nil
}

// isVolumeAvailableOnNode checks if a volume satisfying the given mount is
// available on the given node.
//
// Returns the ID of the volume available, or an empty string if no such volume
// is found.
func (vs *volumeSet) isVolumeAvailableOnNode(mount *api.Mount, node *NodeInfo) string {
    source := mount.Source
    // first, discern whether we're looking for a group or a volume
    // try trimming off the "group:" prefix. if the resulting string is
    // different from the input string (meaning something has been trimmed),
    // then this volume is actually a volume group.
    if group := strings.TrimPrefix(source, "group:"); group != source {
        ids, ok := vs.byGroup[group]
        // if there are no volumes of this group specified, then no volume
        // meets the moutn criteria.
        if !ok {
            return ""
        }

        // iterate through all ids in the group, checking if any one meets the
        // spec.
        for id := range ids {
            if vs.checkVolume(id, node, mount.ReadOnly) {
                return id
            }
        }
        return ""
    }

    // if it's not a group, it's a name. resolve the volume name to its ID
    id, ok := vs.byName[source]
    if !ok || !vs.checkVolume(id, node, mount.ReadOnly) {
        return ""
    }
    return id
}

// checkVolume checks if an individual volume with the given ID can be placed
// on the given node.
func (vs *volumeSet) checkVolume(id string, info *NodeInfo, readOnly bool) bool {
    vi := vs.volumes[id]
    // first, check if the volume's availability is even Active. If not. no
    // reason to bother with anything further.
    if vi.volume != nil && vi.volume.Spec.Availability != api.VolumeAvailabilityActive {
        return false
    }

    // get the node topology for this volume
    var top *api.Topology
    // get the topology for this volume's driver on this node
    for _, info := range info.Description.CSIInfo {
        if info.PluginName == vi.volume.Spec.Driver.Name {
            top = info.AccessibleTopology
            break
        }
    }

    // check if the volume is available on this node. a volume's
    // availability on a node depends on its accessible topology, how it's
    // already being used, and how this task intends to use it.

    if vi.volume.Spec.AccessMode.Scope == api.VolumeScopeSingleNode {
        // if the volume is not in use on this node already, then it can't
        // be used here.
        for _, usage := range vi.tasks {
            if usage.nodeID != info.ID {
                return false
            }
        }
    }

    // even if the volume is currently on this node, or it has multi-node
    // access, the volume sharing needs to be compatible.
    switch vi.volume.Spec.AccessMode.Sharing {
    case api.VolumeSharingNone:
        // if the volume sharing is none, then the volume cannot be
        // used by another task
        if len(vi.tasks) > 0 {
            return false
        }
    case api.VolumeSharingOneWriter:
        // if the mount is not ReadOnly, and the volume has a writer, then
        // we this volume does not work.
        if !readOnly && hasWriter(vi) {
            return false
        }
    case api.VolumeSharingReadOnly:
        // if the volume sharing is read-only, then the Mount must also
        // be read-only
        if !readOnly {
            return false
        }
    }

    // then, do the quick check of whether this volume is in the topology.  if
    // the volume has an AccessibleTopology, and it does not lie within the
    // node's topology, then this volume won't fit.
    return IsInTopology(top, vi.volume.VolumeInfo.AccessibleTopology)
}

// hasWriter is a helper function that returns true if at least one task is
// using this volume not in read-only mode.
func hasWriter(info volumeInfo) bool {
    for _, usage := range info.tasks {
        if !usage.readOnly {
            return true
        }
    }
    return false
}