docker/swarmkit

View on GitHub
manager/dispatcher/assignments.go

Summary

Maintainability
A
3 hrs
Test Coverage
package dispatcher

import (
    "fmt"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/api/equality"
    "github.com/moby/swarmkit/v2/api/validation"
    "github.com/moby/swarmkit/v2/identity"
    "github.com/moby/swarmkit/v2/log"
    "github.com/moby/swarmkit/v2/manager/drivers"
    "github.com/moby/swarmkit/v2/manager/state/store"
    "github.com/sirupsen/logrus"
)

type typeAndID struct {
    id      string
    objType api.ResourceType
}

type assignmentSet struct {
    nodeID   string
    dp       *drivers.DriverProvider
    tasksMap map[string]*api.Task
    // volumesMap keeps track of the VolumePublishStatus of the given volumes.
    // this tells us both which volumes are assigned to the node, and what the
    // last known VolumePublishStatus was, so we can understand if we need to
    // send an update.
    volumesMap map[string]*api.VolumePublishStatus
    // tasksUsingDependency tracks both tasks and volumes using a given
    // dependency. this works because the ID generated for swarm comes from a
    // large enough space that it is reliably astronomically unlikely that IDs
    // will ever collide.
    tasksUsingDependency map[typeAndID]map[string]struct{}
    changes              map[typeAndID]*api.AssignmentChange
    log                  *logrus.Entry
}

func newAssignmentSet(nodeID string, log *logrus.Entry, dp *drivers.DriverProvider) *assignmentSet {
    return &assignmentSet{
        nodeID:               nodeID,
        dp:                   dp,
        changes:              make(map[typeAndID]*api.AssignmentChange),
        tasksMap:             make(map[string]*api.Task),
        volumesMap:           make(map[string]*api.VolumePublishStatus),
        tasksUsingDependency: make(map[typeAndID]map[string]struct{}),
        log:                  log,
    }
}

func assignSecret(a *assignmentSet, readTx store.ReadTx, mapKey typeAndID, t *api.Task) {
    if _, exists := a.tasksUsingDependency[mapKey]; !exists {
        a.tasksUsingDependency[mapKey] = make(map[string]struct{})
    }
    secret, doNotReuse, err := a.secret(readTx, t, mapKey.id)
    if err != nil {
        a.log.WithFields(log.Fields{
            "resource.type": "secret",
            "secret.id":     mapKey.id,
            "error":         err,
        }).Debug("failed to fetch secret")
        return
    }
    // If the secret should not be reused for other tasks, give it a unique ID
    // for the task to allow different values for different tasks.
    if doNotReuse {
        // Give the secret a new ID and mark it as internal
        originalSecretID := secret.ID
        taskSpecificID := identity.CombineTwoIDs(originalSecretID, t.ID)
        secret.ID = taskSpecificID
        secret.Internal = true
        // Create a new mapKey with the new ID and insert it into the
        // dependencies map for the task.  This will make the changes map
        // contain an entry with the new ID rather than the original one.
        mapKey = typeAndID{objType: mapKey.objType, id: secret.ID}
        a.tasksUsingDependency[mapKey] = make(map[string]struct{})
        a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
    }
    a.changes[mapKey] = &api.AssignmentChange{
        Assignment: &api.Assignment{
            Item: &api.Assignment_Secret{
                Secret: secret,
            },
        },
        Action: api.AssignmentChange_AssignmentActionUpdate,
    }
}

func assignConfig(a *assignmentSet, readTx store.ReadTx, mapKey typeAndID) {
    a.tasksUsingDependency[mapKey] = make(map[string]struct{})
    config := store.GetConfig(readTx, mapKey.id)
    if config == nil {
        a.log.WithFields(log.Fields{
            "resource.type": "config",
            "config.id":     mapKey.id,
        }).Debug("config not found")
        return
    }
    a.changes[mapKey] = &api.AssignmentChange{
        Assignment: &api.Assignment{
            Item: &api.Assignment_Config{
                Config: config,
            },
        },
        Action: api.AssignmentChange_AssignmentActionUpdate,
    }
}

func (a *assignmentSet) addTaskDependencies(readTx store.ReadTx, t *api.Task) {
    // first, we go through all ResourceReferences, which give us the necessary
    // information about which secrets and configs are in use.
    for _, resourceRef := range t.Spec.ResourceReferences {
        mapKey := typeAndID{objType: resourceRef.ResourceType, id: resourceRef.ResourceID}
        // if there are no tasks using this dependency yet, then we can assign
        // it.
        if len(a.tasksUsingDependency[mapKey]) == 0 {
            switch resourceRef.ResourceType {
            case api.ResourceType_SECRET:
                assignSecret(a, readTx, mapKey, t)
            case api.ResourceType_CONFIG:
                assignConfig(a, readTx, mapKey)
            default:
                a.log.WithField(
                    "resource.type", resourceRef.ResourceType,
                ).Debug("invalid resource type for a task dependency, skipping")
                continue
            }
        }
        // otherwise, we don't need to add a new assignment. we just need to
        // track the fact that another task is now using this dependency.
        a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
    }

    var secrets []*api.SecretReference
    container := t.Spec.GetContainer()
    if container != nil {
        secrets = container.Secrets
    }

    for _, secretRef := range secrets {
        secretID := secretRef.SecretID
        mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secretID}

        // This checks for the presence of each task in the dependency map for the
        // secret. This is currently only done for secrets since the other types of
        // dependencies do not support driver plugins. Arguably, the same task would
        // not have the same secret as a dependency more than once, but this check
        // makes sure the task only gets the secret assigned once.
        if _, exists := a.tasksUsingDependency[mapKey][t.ID]; !exists {
            assignSecret(a, readTx, mapKey, t)
        }
        a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
    }

    var configs []*api.ConfigReference
    if container != nil {
        configs = container.Configs
    }
    for _, configRef := range configs {
        configID := configRef.ConfigID
        mapKey := typeAndID{objType: api.ResourceType_CONFIG, id: configID}

        if len(a.tasksUsingDependency[mapKey]) == 0 {
            assignConfig(a, readTx, mapKey)
        }
        a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
    }
}

func (a *assignmentSet) releaseDependency(mapKey typeAndID, assignment *api.Assignment, taskID string) bool {
    delete(a.tasksUsingDependency[mapKey], taskID)
    if len(a.tasksUsingDependency[mapKey]) != 0 {
        return false
    }
    // No tasks are using the dependency anymore
    delete(a.tasksUsingDependency, mapKey)
    a.changes[mapKey] = &api.AssignmentChange{
        Assignment: assignment,
        Action:     api.AssignmentChange_AssignmentActionRemove,
    }
    return true
}

// releaseTaskDependencies needs a store transaction because volumes have
// associated Secrets which need to be released.
func (a *assignmentSet) releaseTaskDependencies(readTx store.ReadTx, t *api.Task) bool {
    var modified bool

    for _, resourceRef := range t.Spec.ResourceReferences {
        var assignment *api.Assignment
        switch resourceRef.ResourceType {
        case api.ResourceType_SECRET:
            assignment = &api.Assignment{
                Item: &api.Assignment_Secret{
                    Secret: &api.Secret{ID: resourceRef.ResourceID},
                },
            }
        case api.ResourceType_CONFIG:
            assignment = &api.Assignment{
                Item: &api.Assignment_Config{
                    Config: &api.Config{ID: resourceRef.ResourceID},
                },
            }
        default:
            a.log.WithField(
                "resource.type", resourceRef.ResourceType,
            ).Debug("invalid resource type for a task dependency, skipping")
            continue
        }

        mapKey := typeAndID{objType: resourceRef.ResourceType, id: resourceRef.ResourceID}
        if a.releaseDependency(mapKey, assignment, t.ID) {
            modified = true
        }
    }

    container := t.Spec.GetContainer()

    var secrets []*api.SecretReference
    if container != nil {
        secrets = container.Secrets
    }

    for _, secretRef := range secrets {
        secretID := secretRef.SecretID
        mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secretID}
        assignment := &api.Assignment{
            Item: &api.Assignment_Secret{
                Secret: &api.Secret{ID: secretID},
            },
        }
        if a.releaseDependency(mapKey, assignment, t.ID) {
            modified = true
        }
    }

    var configs []*api.ConfigReference
    if container != nil {
        configs = container.Configs
    }

    for _, configRef := range configs {
        configID := configRef.ConfigID
        mapKey := typeAndID{objType: api.ResourceType_CONFIG, id: configID}
        assignment := &api.Assignment{
            Item: &api.Assignment_Config{
                Config: &api.Config{ID: configID},
            },
        }
        if a.releaseDependency(mapKey, assignment, t.ID) {
            modified = true
        }
    }

    return modified
}

func (a *assignmentSet) addOrUpdateTask(readTx store.ReadTx, t *api.Task) bool {
    // We only care about tasks that are ASSIGNED or higher.
    if t.Status.State < api.TaskStateAssigned {
        return false
    }

    if oldTask, exists := a.tasksMap[t.ID]; exists {
        // States ASSIGNED and below are set by the orchestrator/scheduler,
        // not the agent, so tasks in these states need to be sent to the
        // agent even if nothing else has changed.
        if equality.TasksEqualStable(oldTask, t) && t.Status.State > api.TaskStateAssigned {
            // this update should not trigger a task change for the agent
            a.tasksMap[t.ID] = t
            // If this task got updated to a final state, let's release
            // the dependencies that are being used by the task
            if t.Status.State > api.TaskStateRunning {
                // If releasing the dependencies caused us to
                // remove something from the assignment set,
                // mark one modification.
                return a.releaseTaskDependencies(readTx, t)
            }
            return false
        }
    } else if t.Status.State <= api.TaskStateRunning {
        // If this task wasn't part of the assignment set before, and it's <= RUNNING
        // add the dependencies it references to the assignment.
        // Task states > RUNNING are worker reported only, are never created in
        // a > RUNNING state.
        a.addTaskDependencies(readTx, t)
    }
    a.tasksMap[t.ID] = t
    a.changes[typeAndID{objType: api.ResourceType_TASK, id: t.ID}] = &api.AssignmentChange{
        Assignment: &api.Assignment{
            Item: &api.Assignment_Task{
                Task: t,
            },
        },
        Action: api.AssignmentChange_AssignmentActionUpdate,
    }
    return true
}

// addOrUpdateVolume tracks a Volume assigned to a node.
func (a *assignmentSet) addOrUpdateVolume(readTx store.ReadTx, v *api.Volume) bool {
    var publishStatus *api.VolumePublishStatus
    for _, status := range v.PublishStatus {
        if status.NodeID == a.nodeID {
            publishStatus = status
            break
        }
    }

    // if there is no publishStatus for this Volume on this Node, or if the
    // Volume has not yet been published to this node, then we do not need to
    // track this assignment.
    if publishStatus == nil || publishStatus.State < api.VolumePublishStatus_PUBLISHED {
        return false
    }

    // check if we are already tracking this volume, and what its old status
    // is. if the states are identical, then we don't have any update to make.
    if oldStatus, ok := a.volumesMap[v.ID]; ok && oldStatus.State == publishStatus.State {
        return false
    }

    // if the volume has already been confirmed as unpublished, we can stop
    // tracking it and remove its dependencies.
    if publishStatus.State > api.VolumePublishStatus_PENDING_NODE_UNPUBLISH {
        return a.removeVolume(readTx, v)
    }

    for _, secret := range v.Spec.Secrets {
        mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secret.Secret}
        if len(a.tasksUsingDependency[mapKey]) == 0 {
            // we can call assignSecret with task being nil, but it does mean
            // that any secret that uses a driver will not work. we'll call
            // that a limitation of volumes for now.
            assignSecret(a, readTx, mapKey, nil)
        }
        a.tasksUsingDependency[mapKey][v.ID] = struct{}{}
    }

    // volumes are sent to nodes as VolumeAssignments. This is because a node
    // needs node-specific information (the PublishContext from
    // ControllerPublishVolume).
    assignment := &api.VolumeAssignment{
        ID:             v.ID,
        VolumeID:       v.VolumeInfo.VolumeID,
        Driver:         v.Spec.Driver,
        VolumeContext:  v.VolumeInfo.VolumeContext,
        PublishContext: publishStatus.PublishContext,
        AccessMode:     v.Spec.AccessMode,
        Secrets:        v.Spec.Secrets,
    }

    volumeKey := typeAndID{objType: api.ResourceType_VOLUME, id: v.ID}
    // assignmentChange is the whole assignment without the action, which we
    // will set next
    assignmentChange := &api.AssignmentChange{
        Assignment: &api.Assignment{
            Item: &api.Assignment_Volume{
                Volume: assignment,
            },
        },
    }

    // if we're in state PENDING_NODE_UNPUBLISH, we actually need to send a
    // remove message. we do this every time, even if the node never got the
    // first add assignment. This is because the node might not know that it
    // has a volume published; for example, the node may be restarting, and
    // the in-memory store does not have knowledge of the volume.
    if publishStatus.State == api.VolumePublishStatus_PENDING_NODE_UNPUBLISH {
        assignmentChange.Action = api.AssignmentChange_AssignmentActionRemove
    } else {
        assignmentChange.Action = api.AssignmentChange_AssignmentActionUpdate
    }
    a.changes[volumeKey] = assignmentChange
    a.volumesMap[v.ID] = publishStatus
    return true
}

func (a *assignmentSet) removeVolume(readTx store.ReadTx, v *api.Volume) bool {
    if _, exists := a.volumesMap[v.ID]; !exists {
        return false
    }

    modified := false

    // if the volume does exists, we can release its secrets
    for _, secret := range v.Spec.Secrets {
        mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secret.Secret}
        assignment := &api.Assignment{
            Item: &api.Assignment_Secret{
                Secret: &api.Secret{ID: secret.Secret},
            },
        }
        if a.releaseDependency(mapKey, assignment, v.ID) {
            modified = true
        }
    }

    // we don't need to add a removal message. the removal of the
    // VolumeAssignment will have already happened.
    delete(a.volumesMap, v.ID)

    return modified
}

func (a *assignmentSet) removeTask(readTx store.ReadTx, t *api.Task) bool {
    if _, exists := a.tasksMap[t.ID]; !exists {
        return false
    }

    a.changes[typeAndID{objType: api.ResourceType_TASK, id: t.ID}] = &api.AssignmentChange{
        Assignment: &api.Assignment{
            Item: &api.Assignment_Task{
                Task: &api.Task{ID: t.ID},
            },
        },
        Action: api.AssignmentChange_AssignmentActionRemove,
    }

    delete(a.tasksMap, t.ID)

    // Release the dependencies being used by this task.
    // Ignoring the return here. We will always mark this as a
    // modification, since a task is being removed.
    a.releaseTaskDependencies(readTx, t)
    return true
}

func (a *assignmentSet) message() api.AssignmentsMessage {
    var message api.AssignmentsMessage
    for _, change := range a.changes {
        message.Changes = append(message.Changes, change)
    }

    // The the set of changes is reinitialized to prepare for formation
    // of the next message.
    a.changes = make(map[typeAndID]*api.AssignmentChange)

    return message
}

// secret populates the secret value from raft store. For external secrets, the value is populated
// from the secret driver. The function returns: a secret object; an indication of whether the value
// is to be reused across tasks; and an error if the secret is not found in the store, if the secret
// driver responds with one or if the payload does not pass validation.
func (a *assignmentSet) secret(readTx store.ReadTx, task *api.Task, secretID string) (*api.Secret, bool, error) {
    secret := store.GetSecret(readTx, secretID)
    if secret == nil {
        return nil, false, fmt.Errorf("secret not found")
    }
    if secret.Spec.Driver == nil {
        return secret, false, nil
    }
    d, err := a.dp.NewSecretDriver(secret.Spec.Driver)
    if err != nil {
        return nil, false, err
    }
    value, doNotReuse, err := d.Get(&secret.Spec, task)
    if err != nil {
        return nil, false, err
    }
    if err := validation.ValidateSecretPayload(value); err != nil {
        return nil, false, err
    }
    // Assign the secret
    secret.Spec.Data = value
    return secret, doNotReuse, nil
}