docker/swarmkit

View on GitHub
manager/orchestrator/jobs/global/reconciler.go

Summary

Maintainability
D
1 day
Test Coverage
package global

import (
    "context"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/manager/constraint"
    "github.com/moby/swarmkit/v2/manager/orchestrator"
    "github.com/moby/swarmkit/v2/manager/state/store"
)

// restartSupervisor is an interface representing the methods from the
// restart.SupervisorInterface that are actually needed by the reconciler. This
// more limited interface allows us to write a less ugly fake for unit testing.
type restartSupervisor interface {
    Restart(context.Context, store.Tx, *api.Cluster, *api.Service, api.Task) error
}

// Reconciler is an object that manages reconciliation of global jobs. It is
// blocking and non-asynchronous, for ease of testing. It implements the
// Reconciler interface from the orchestrator package above it, and the
// taskinit.InitHandler interface.
type Reconciler struct {
    store *store.MemoryStore

    restart restartSupervisor
}

// NewReconciler creates a new global job reconciler.
func NewReconciler(store *store.MemoryStore, restart restartSupervisor) *Reconciler {
    return &Reconciler{
        store:   store,
        restart: restart,
    }
}

// ReconcileService reconciles one global job service.
func (r *Reconciler) ReconcileService(id string) error {
    var (
        service *api.Service
        cluster *api.Cluster
        tasks   []*api.Task
        nodes   []*api.Node
        viewErr error
    )

    // we need to first get the latest iteration of the service, its tasks, and
    // the nodes in the cluster.
    r.store.View(func(tx store.ReadTx) {
        service = store.GetService(tx, id)
        if service == nil {
            return
        }

        // getting tasks with FindTasks should only return an error if we've
        // made a mistake coding; there's no user-input or even reasonable
        // system state that can cause it. If it returns an error, we'll just
        // panic and crash.
        tasks, viewErr = store.FindTasks(tx, store.ByServiceID(id))
        if viewErr != nil {
            return
        }

        // same as with FindTasks
        nodes, viewErr = store.FindNodes(tx, store.All)
        if viewErr != nil {
            return
        }

        clusters, _ := store.FindClusters(tx, store.All)
        if len(clusters) == 1 {
            cluster = clusters[0]
        } else if len(clusters) > 1 {
            panic("there should never be more than one cluster object")
        }
    })

    if viewErr != nil {
        return viewErr
    }

    // the service may be nil if the service has been deleted before we entered
    // the View.
    if service == nil {
        return nil
    }

    if service.JobStatus == nil {
        service.JobStatus = &api.JobStatus{}
    }

    // we need to compute the constraints on the service so we know which nodes
    // to schedule it on
    var constraints []constraint.Constraint
    if service.Spec.Task.Placement != nil && len(service.Spec.Task.Placement.Constraints) != 0 {
        // constraint.Parse does return an error, but we don't need to check
        // it, because it was already checked when the service was created or
        // updated.
        constraints, _ = constraint.Parse(service.Spec.Task.Placement.Constraints)
    }

    var candidateNodes []string
    var invalidNodes []string
    for _, node := range nodes {
        // instead of having a big ugly multi-line boolean expression in the
        // if-statement, we'll have several if-statements, and bail out of
        // this loop iteration with continue if the node is not acceptable
        if !constraint.NodeMatches(constraints, node) {
            continue
        }

        // if a node is invalid, we should remove any tasks that might be on it
        if orchestrator.InvalidNode(node) {
            invalidNodes = append(invalidNodes, node.ID)
            continue
        }

        if node.Spec.Availability != api.NodeAvailabilityActive {
            continue
        }
        if node.Status.State != api.NodeStatus_READY {
            continue
        }
        // you can append to a nil slice and get a non-nil slice, which is
        // pretty slick.
        candidateNodes = append(candidateNodes, node.ID)
    }

    // now, we have a list of all nodes that match constraints. it's time to
    // match running tasks to the nodes. we need to identify all nodes that
    // need new tasks, which is any node that doesn't have a task of this job
    // iteration. trade some space for some time by building a node ID to task
    // ID mapping, so that we're just doing 2x linear operation, instead of a
    // quadratic operation.
    nodeToTask := map[string]string{}
    // additionally, while we're iterating through tasks, if any of those tasks
    // are failed, we'll hand them to the restart supervisor to handle
    restartTasks := []string{}
    // and if there are any tasks belonging to old job iterations, set them to
    // be removed
    removeTasks := []string{}
    for _, task := range tasks {
        // match all tasks belonging to this job iteration which are in desired
        // state completed, including failed tasks. We only want to create
        // tasks for nodes on which there are no existing tasks.
        if task.JobIteration != nil {
            if task.JobIteration.Index == service.JobStatus.JobIteration.Index &&
                task.DesiredState <= api.TaskStateCompleted {
                // we already know the task is desired to be executing (because its
                // desired state is Completed). Check here to see if it's already
                // failed, so we can restart it
                if task.Status.State > api.TaskStateCompleted {
                    restartTasks = append(restartTasks, task.ID)
                }
                nodeToTask[task.NodeID] = task.ID
            }

            if task.JobIteration.Index != service.JobStatus.JobIteration.Index {
                if task.DesiredState != api.TaskStateRemove {
                    removeTasks = append(removeTasks, task.ID)
                }
            }
        }
    }

    return r.store.Batch(func(batch *store.Batch) error {
        // first, create any new tasks required.
        for _, node := range candidateNodes {
            // check if there is a task for this node ID. If not, then we need
            // to create one.
            if _, ok := nodeToTask[node]; !ok {
                if err := batch.Update(func(tx store.Tx) error {
                    // if the node does not already have a running or completed
                    // task, create a task for this node.
                    task := orchestrator.NewTask(cluster, service, 0, node)
                    task.JobIteration = &service.JobStatus.JobIteration
                    task.DesiredState = api.TaskStateCompleted
                    return store.CreateTask(tx, task)
                }); err != nil {
                    return err
                }
            }
        }

        // then, restart any tasks that are failed
        for _, taskID := range restartTasks {
            if err := batch.Update(func(tx store.Tx) error {
                // get the latest version of the task for the restart
                t := store.GetTask(tx, taskID)
                // if it's deleted, nothing to do
                if t == nil {
                    return nil
                }

                // if it's not still desired to be running, then don't restart
                // it.
                if t.DesiredState > api.TaskStateCompleted {
                    return nil
                }

                // Finally, restart it
                // TODO(dperny): pass in context to ReconcileService, so we can
                // pass it in here.
                return r.restart.Restart(context.Background(), tx, cluster, service, *t)
            }); err != nil {
                // TODO(dperny): probably should log like in the other
                // orchestrators instead of returning here.
                return err
            }
        }

        // remove tasks that need to be removed
        for _, taskID := range removeTasks {
            if err := batch.Update(func(tx store.Tx) error {
                t := store.GetTask(tx, taskID)
                if t == nil {
                    return nil
                }

                if t.DesiredState == api.TaskStateRemove {
                    return nil
                }

                t.DesiredState = api.TaskStateRemove
                return store.UpdateTask(tx, t)
            }); err != nil {
                return err
            }
        }

        // finally, shut down any tasks on invalid nodes
        for _, nodeID := range invalidNodes {
            if taskID, ok := nodeToTask[nodeID]; ok {
                if err := batch.Update(func(tx store.Tx) error {
                    t := store.GetTask(tx, taskID)
                    if t == nil {
                        return nil
                    }
                    // if the task is still desired to be running, and is still
                    // actually, running, then it still needs to be shut down.
                    if t.DesiredState > api.TaskStateCompleted || t.Status.State <= api.TaskStateRunning {
                        t.DesiredState = api.TaskStateShutdown
                        return store.UpdateTask(tx, t)
                    }
                    return nil
                }); err != nil {
                    return err
                }
            }
        }
        return nil
    })
}

// IsRelatedService returns true if the task is a global job. This method
// fulfills the taskinit.InitHandler interface. Because it is just a wrapper
// around a well-tested function call, it has no tests of its own.
func (r *Reconciler) IsRelatedService(service *api.Service) bool {
    return orchestrator.IsGlobalJob(service)
}

// FixTask validates that a task is compliant with the rest of the cluster
// state, and fixes it if it's not. This covers some main scenarios:
//
//   - The node that the task is running on is now paused or drained. we do not
//     need to check if the node still meets constraints -- that is the purview
//     of the constraint enforcer.
//   - The task has failed and needs to be restarted.
//
// This implements the FixTask method of the taskinit.InitHandler interface.
func (r *Reconciler) FixTask(ctx context.Context, batch *store.Batch, t *api.Task) {
    // tasks already desired to be shut down need no action.
    if t.DesiredState > api.TaskStateCompleted {
        return
    }

    batch.Update(func(tx store.Tx) error {
        node := store.GetNode(tx, t.NodeID)
        // if the node is no longer a valid node for this task, we need to shut
        // it down
        if orchestrator.InvalidNode(node) {
            task := store.GetTask(tx, t.ID)
            if task != nil && task.DesiredState < api.TaskStateShutdown {
                task.DesiredState = api.TaskStateShutdown
                return store.UpdateTask(tx, task)
            }
        }
        // we will reconcile all services after fixing the tasks, so we don't
        // need to restart tasks right now; we'll do so after this.
        return nil
    })
}

// SlotTuple returns a slot tuple representing this task. It implements the
// taskinit.InitHandler interface.
func (r *Reconciler) SlotTuple(t *api.Task) orchestrator.SlotTuple {
    return orchestrator.SlotTuple{
        ServiceID: t.ServiceID,
        NodeID:    t.NodeID,
    }
}