docker/swarmkit

View on GitHub
agent/csi/volumes.go

Summary

Maintainability
A
0 mins
Test Coverage
package csi

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/moby/swarmkit/v2/agent/csi/plugin"
    "github.com/moby/swarmkit/v2/agent/exec"
    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/log"
    mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
    "github.com/moby/swarmkit/v2/volumequeue"
)

const csiCallTimeout = 15 * time.Second

// volumeState keeps track of the state of a volume on this node.
type volumeState struct {
    // volume is the actual VolumeAssignment for this volume
    volume *api.VolumeAssignment
    // remove is true if the volume is to be removed, or false if it should be
    // active.
    remove bool
    // removeCallback is called when the volume is successfully removed.
    removeCallback func(id string)
}

// volumes is a map that keeps all the currently available volumes to the agent
// mapped by volume ID.
type volumes struct {
    // mu guards access to the volumes map.
    mu sync.RWMutex

    // volumes is a mapping of volume ID to volumeState
    volumes map[string]volumeState

    // plugins is the Manager, which provides translation to the CSI RPCs
    plugins plugin.Manager

    // pendingVolumes is a VolumeQueue which manages which volumes are
    // processed and when.
    pendingVolumes *volumequeue.VolumeQueue
}

// NewManager returns a place to store volumes.
func NewManager(pg mobyplugin.Getter, secrets exec.SecretGetter) exec.VolumesManager {
    r := &volumes{
        volumes:        map[string]volumeState{},
        plugins:        plugin.NewManager(pg, secrets),
        pendingVolumes: volumequeue.NewVolumeQueue(),
    }
    go r.retryVolumes()

    return r
}

// retryVolumes runs in a goroutine to retry failing volumes.
func (r *volumes) retryVolumes() {
    ctx := log.WithModule(context.Background(), "node/agent/csi")
    for {
        vid, attempt := r.pendingVolumes.Wait()

        dctx := log.WithFields(ctx, log.Fields{
            "volume.id": vid,
            "attempt":   fmt.Sprintf("%d", attempt),
        })

        // this case occurs when the Stop method has been called on
        // pendingVolumes, and means that we should pack up and exit.
        if vid == "" && attempt == 0 {
            break
        }
        r.tryVolume(dctx, vid, attempt)
    }
}

// tryVolume synchronously tries one volume. it puts the volume back into the
// queue if the attempt fails.
func (r *volumes) tryVolume(ctx context.Context, id string, attempt uint) {
    r.mu.RLock()
    vs, ok := r.volumes[id]
    r.mu.RUnlock()

    if !ok {
        return
    }

    // create a sub-context with a timeout. because we can only process one
    // volume at a time, if we rely on the server-side or default timeout, we
    // may be waiting a very long time for a particular volume to fail.
    //
    // TODO(dperny): there is almost certainly a more intelligent way to do
    // this. For example, we could:
    //
    //   * Change code such that we can service volumes managed by different
    //     plugins at the same time.
    //   * Take longer timeouts when we don't have any other volumes in the
    //     queue
    //   * Have interruptible attempts, so that if we're taking longer
    //     timeouts, we can abort them to service new volumes.
    //
    // These are too complicated to be worth the engineering effort at this
    // time.

    timeoutCtx, cancel := context.WithTimeout(ctx, csiCallTimeout)
    // always gotta call the WithTimeout cancel
    defer cancel()

    if !vs.remove {
        if err := r.publishVolume(timeoutCtx, vs.volume); err != nil {
            log.G(timeoutCtx).WithError(err).Info("publishing volume failed")
            r.pendingVolumes.Enqueue(id, attempt+1)
        }
    } else {
        if err := r.unpublishVolume(timeoutCtx, vs.volume); err != nil {
            log.G(timeoutCtx).WithError(err).Info("upublishing volume failed")
            r.pendingVolumes.Enqueue(id, attempt+1)
        } else {
            // if unpublishing was successful, then call the callback
            vs.removeCallback(id)
        }
    }
}

// Get returns a volume published path for the provided volume ID.  If the volume doesn't exist, returns empty string.
func (r *volumes) Get(volumeID string) (string, error) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if vs, ok := r.volumes[volumeID]; ok {
        if vs.remove {
            // TODO(dperny): use a structured error
            return "", fmt.Errorf("volume being removed")
        }

        if p, err := r.plugins.Get(vs.volume.Driver.Name); err == nil {
            path := p.GetPublishedPath(volumeID)
            if path != "" {
                return path, nil
            }
            // don't put this line here, it spams like crazy.
            // log.L.WithField("method", "(*volumes).Get").Debugf("Path not published for volume:%v", volumeID)
        } else {
            return "", err
        }

    }
    return "", fmt.Errorf("%w: published path is unavailable", exec.ErrDependencyNotReady)
}

// Add adds one or more volumes to the volume map.
func (r *volumes) Add(volumes ...api.VolumeAssignment) {
    r.mu.Lock()
    defer r.mu.Unlock()

    for _, volume := range volumes {
        // if we get an Add operation, then we will always restart the retries.
        v := volume.Copy()
        r.volumes[volume.ID] = volumeState{
            volume: v,
        }
        // enqueue the volume so that we process it
        r.pendingVolumes.Enqueue(volume.ID, 0)
        log.L.WithField("method", "(*volumes).Add").Debugf("Add Volume: %v", volume.VolumeID)
    }
}

// Remove removes one or more volumes from this manager. callback is called
// whenever the removal is successful.
func (r *volumes) Remove(volumes []api.VolumeAssignment, callback func(id string)) {
    r.mu.Lock()
    defer r.mu.Unlock()

    for _, volume := range volumes {
        // if we get a Remove call, then we always restart the retries and
        // attempt removal.
        v := volume.Copy()
        r.volumes[volume.ID] = volumeState{
            volume:         v,
            remove:         true,
            removeCallback: callback,
        }
        r.pendingVolumes.Enqueue(volume.ID, 0)
    }
}

func (r *volumes) publishVolume(ctx context.Context, assignment *api.VolumeAssignment) error {
    log.G(ctx).Info("attempting to publish volume")
    p, err := r.plugins.Get(assignment.Driver.Name)
    if err != nil {
        return err
    }

    // even though this may have succeeded already, the call to NodeStageVolume
    // is idempotent, so we can retry it every time.
    if err := p.NodeStageVolume(ctx, assignment); err != nil {
        return err
    }

    log.G(ctx).Debug("staging volume succeeded, attempting to publish volume")

    return p.NodePublishVolume(ctx, assignment)
}

func (r *volumes) unpublishVolume(ctx context.Context, assignment *api.VolumeAssignment) error {
    log.G(ctx).Info("attempting to unpublish volume")
    p, err := r.plugins.Get(assignment.Driver.Name)
    if err != nil {
        return err
    }

    if err := p.NodeUnpublishVolume(ctx, assignment); err != nil {
        return err
    }

    return p.NodeUnstageVolume(ctx, assignment)
}

func (r *volumes) Plugins() exec.VolumePluginManager {
    return r.plugins
}

// taskRestrictedVolumesProvider restricts the ids to the task.
type taskRestrictedVolumesProvider struct {
    volumes   exec.VolumeGetter
    volumeIDs map[string]struct{}
}

func (sp *taskRestrictedVolumesProvider) Get(volumeID string) (string, error) {
    if _, ok := sp.volumeIDs[volumeID]; !ok {
        return "", fmt.Errorf("task not authorized to access volume %s", volumeID)
    }

    return sp.volumes.Get(volumeID)
}

// Restrict provides a getter that only allows access to the volumes
// referenced by the task.
func Restrict(volumes exec.VolumeGetter, t *api.Task) exec.VolumeGetter {
    vids := map[string]struct{}{}

    for _, v := range t.Volumes {
        vids[v.ID] = struct{}{}
    }

    return &taskRestrictedVolumesProvider{volumes: volumes, volumeIDs: vids}
}