docker/swarmkit

View on GitHub
manager/scheduler/scheduler.go

Summary

Maintainability
F
4 days
Test Coverage
package scheduler

import (
    "context"
    "sync"
    "time"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/api/genericresource"
    "github.com/moby/swarmkit/v2/log"
    "github.com/moby/swarmkit/v2/manager/state"
    "github.com/moby/swarmkit/v2/manager/state/store"
    "github.com/moby/swarmkit/v2/protobuf/ptypes"
)

const (
    // monitorFailures is the lookback period for counting failures of
    // a task to determine if a node is faulty for a particular service.
    monitorFailures = 5 * time.Minute

    // maxFailures is the number of failures within monitorFailures that
    // triggers downweighting of a node in the sorting function.
    maxFailures = 5
)

type schedulingDecision struct {
    old *api.Task
    new *api.Task
}

// Scheduler assigns tasks to nodes.
type Scheduler struct {
    store           *store.MemoryStore
    unassignedTasks map[string]*api.Task
    // pendingPreassignedTasks already have NodeID, need resource validation
    pendingPreassignedTasks map[string]*api.Task
    // preassignedTasks tracks tasks that were preassigned, including those
    // past the pending state.
    preassignedTasks map[string]struct{}
    nodeSet          nodeSet
    allTasks         map[string]*api.Task
    pipeline         *Pipeline
    volumes          *volumeSet

    // stopOnce is a sync.Once used to ensure that Stop is idempotent
    stopOnce sync.Once
    // stopChan signals to the state machine to stop running
    stopChan chan struct{}
    // doneChan is closed when the state machine terminates
    doneChan chan struct{}
}

// New creates a new scheduler.
func New(store *store.MemoryStore) *Scheduler {
    return &Scheduler{
        store:                   store,
        unassignedTasks:         make(map[string]*api.Task),
        pendingPreassignedTasks: make(map[string]*api.Task),
        preassignedTasks:        make(map[string]struct{}),
        allTasks:                make(map[string]*api.Task),
        stopChan:                make(chan struct{}),
        doneChan:                make(chan struct{}),
        pipeline:                NewPipeline(),
        volumes:                 newVolumeSet(),
    }
}

func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
    // add all volumes that are ready to the volumeSet
    volumes, err := store.FindVolumes(tx, store.All)
    if err != nil {
        return err
    }

    for _, volume := range volumes {
        // only add volumes that have been created, meaning they have a
        // VolumeID.
        if volume.VolumeInfo != nil && volume.VolumeInfo.VolumeID != "" {
            s.volumes.addOrUpdateVolume(volume)
        }
    }

    tasks, err := store.FindTasks(tx, store.All)
    if err != nil {
        return err
    }

    tasksByNode := make(map[string]map[string]*api.Task)
    for _, t := range tasks {
        // Ignore all tasks that have not reached PENDING
        // state and tasks that no longer consume resources.
        if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
            continue
        }

        // Also ignore tasks that have not yet been assigned but desired state
        // is beyond TaskStateCompleted. This can happen if you update, delete
        // or scale down a service before its tasks were assigned.
        if t.Status.State == api.TaskStatePending && t.DesiredState > api.TaskStateCompleted {
            continue
        }

        s.allTasks[t.ID] = t
        if t.NodeID == "" {
            s.enqueue(t)
            continue
        }
        // preassigned tasks need to validate resource requirement on corresponding node
        if t.Status.State == api.TaskStatePending {
            s.preassignedTasks[t.ID] = struct{}{}
            s.pendingPreassignedTasks[t.ID] = t
            continue
        }

        // track the volumes in use by the task
        s.volumes.reserveTaskVolumes(t)

        if tasksByNode[t.NodeID] == nil {
            tasksByNode[t.NodeID] = make(map[string]*api.Task)
        }
        tasksByNode[t.NodeID][t.ID] = t
    }

    return s.buildNodeSet(tx, tasksByNode)
}

// Run is the scheduler event loop.
func (s *Scheduler) Run(pctx context.Context) error {
    ctx := log.WithModule(pctx, "scheduler")
    defer close(s.doneChan)

    s.pipeline.AddFilter(&VolumesFilter{vs: s.volumes})

    updates, cancel, err := store.ViewAndWatch(s.store, s.setupTasksList)
    if err != nil {
        log.G(ctx).WithError(err).Errorf("snapshot store update failed")
        return err
    }
    defer cancel()

    // Validate resource for tasks from preassigned tasks
    // do this before other tasks because preassigned tasks like
    // global service should start before other tasks
    s.processPreassignedTasks(ctx)

    // Queue all unassigned tasks before processing changes.
    s.tick(ctx)

    const (
        // commitDebounceGap is the amount of time to wait between
        // commit events to debounce them.
        commitDebounceGap = 50 * time.Millisecond
        // maxLatency is a time limit on the debouncing.
        maxLatency = time.Second
    )
    var (
        debouncingStarted     time.Time
        commitDebounceTimer   *time.Timer
        commitDebounceTimeout <-chan time.Time
    )

    tickRequired := false

    schedule := func() {
        if len(s.pendingPreassignedTasks) > 0 {
            s.processPreassignedTasks(ctx)
        }
        if tickRequired {
            s.tick(ctx)
            tickRequired = false
        }
    }

    // Watch for changes.
    for {
        select {
        case event := <-updates:
            switch v := event.(type) {
            case api.EventCreateTask:
                if s.createTask(ctx, v.Task) {
                    tickRequired = true
                }
            case api.EventUpdateTask:
                if s.updateTask(ctx, v.Task) {
                    tickRequired = true
                }
            case api.EventDeleteTask:
                if s.deleteTask(v.Task) {
                    // deleting tasks may free up node resource, pending tasks should be re-evaluated.
                    tickRequired = true
                }
            case api.EventCreateNode:
                s.createOrUpdateNode(v.Node)
                tickRequired = true
            case api.EventUpdateNode:
                s.createOrUpdateNode(v.Node)
                tickRequired = true
            case api.EventDeleteNode:
                s.nodeSet.remove(v.Node.ID)
            case api.EventUpdateVolume:
                // there is no need for a EventCreateVolume case, because
                // volumes are not ready to use until they've passed through
                // the volume manager and been created with the plugin
                //
                // as such, only addOrUpdateVolume if the VolumeInfo exists and
                // has a nonempty VolumeID
                if v.Volume.VolumeInfo != nil && v.Volume.VolumeInfo.VolumeID != "" {
                    // TODO(dperny): verify that updating volumes doesn't break
                    // scheduling
                    log.G(ctx).WithField("volume.id", v.Volume.ID).Debug("updated volume")
                    s.volumes.addOrUpdateVolume(v.Volume)
                    tickRequired = true
                }
            case state.EventCommit:
                if commitDebounceTimer != nil {
                    if time.Since(debouncingStarted) > maxLatency {
                        commitDebounceTimer.Stop()
                        commitDebounceTimer = nil
                        commitDebounceTimeout = nil
                        schedule()
                    } else {
                        commitDebounceTimer.Reset(commitDebounceGap)
                    }
                } else {
                    commitDebounceTimer = time.NewTimer(commitDebounceGap)
                    commitDebounceTimeout = commitDebounceTimer.C
                    debouncingStarted = time.Now()
                }
            }
        case <-commitDebounceTimeout:
            schedule()
            commitDebounceTimer = nil
            commitDebounceTimeout = nil
        case <-s.stopChan:
            return nil
        }
    }
}

// Stop causes the scheduler event loop to stop running.
func (s *Scheduler) Stop() {
    // ensure stop is called only once. this helps in some test cases.
    s.stopOnce.Do(func() {
        close(s.stopChan)
    })
    <-s.doneChan
}

// enqueue queues a task for scheduling.
func (s *Scheduler) enqueue(t *api.Task) {
    s.unassignedTasks[t.ID] = t
}

func (s *Scheduler) createTask(ctx context.Context, t *api.Task) bool {
    // Ignore all tasks that have not reached PENDING
    // state, and tasks that no longer consume resources.
    if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
        return false
    }

    s.allTasks[t.ID] = t
    if t.NodeID == "" {
        // unassigned task
        s.enqueue(t)
        return true
    }

    if t.Status.State == api.TaskStatePending {
        s.preassignedTasks[t.ID] = struct{}{}
        s.pendingPreassignedTasks[t.ID] = t
        // preassigned tasks do not contribute to running tasks count
        return false
    }

    nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
    if err == nil && nodeInfo.addTask(t) {
        s.nodeSet.updateNode(nodeInfo)
    }

    return false
}

func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
    // Ignore all tasks that have not reached PENDING
    // state.
    if t.Status.State < api.TaskStatePending {
        return false
    }

    oldTask := s.allTasks[t.ID]

    // Ignore all tasks that have not reached Pending
    // state, and tasks that no longer consume resources.
    if t.Status.State > api.TaskStateRunning {
        if oldTask == nil {
            return false
        }

        if t.Status.State != oldTask.Status.State &&
            (t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) {
            // Keep track of task failures, so other nodes can be preferred
            // for scheduling this service if it looks like the service is
            // failing in a loop on this node. However, skip this for
            // preassigned tasks, because the scheduler does not choose
            // which nodes those run on.
            if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned {
                nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
                if err == nil {
                    nodeInfo.taskFailed(ctx, t)
                    s.nodeSet.updateNode(nodeInfo)
                }
            }
        }

        s.deleteTask(oldTask)

        return true
    }

    if t.NodeID == "" {
        // unassigned task
        if oldTask != nil {
            s.deleteTask(oldTask)
        }
        s.allTasks[t.ID] = t
        s.enqueue(t)
        return true
    }

    if t.Status.State == api.TaskStatePending {
        if oldTask != nil {
            s.deleteTask(oldTask)
        }
        s.preassignedTasks[t.ID] = struct{}{}
        s.allTasks[t.ID] = t
        s.pendingPreassignedTasks[t.ID] = t
        // preassigned tasks do not contribute to running tasks count
        return false
    }

    s.allTasks[t.ID] = t
    nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
    if err == nil && nodeInfo.addTask(t) {
        s.nodeSet.updateNode(nodeInfo)
    }

    return false
}

func (s *Scheduler) deleteTask(t *api.Task) bool {
    delete(s.allTasks, t.ID)
    delete(s.preassignedTasks, t.ID)
    delete(s.pendingPreassignedTasks, t.ID)

    // remove the task volume reservations as well, if any
    for _, attachment := range t.Volumes {
        s.volumes.releaseVolume(attachment.ID, t.ID)
    }

    nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
    if err == nil && nodeInfo.removeTask(t) {
        s.nodeSet.updateNode(nodeInfo)
        return true
    }
    return false
}

func (s *Scheduler) createOrUpdateNode(n *api.Node) {
    nodeInfo, nodeInfoErr := s.nodeSet.nodeInfo(n.ID)
    var resources *api.Resources
    if n.Description != nil && n.Description.Resources != nil {
        resources = n.Description.Resources.Copy()
        // reconcile resources by looping over all tasks in this node
        if nodeInfoErr == nil {
            for _, task := range nodeInfo.Tasks {
                reservations := taskReservations(task.Spec)

                resources.MemoryBytes -= reservations.MemoryBytes
                resources.NanoCPUs -= reservations.NanoCPUs

                genericresource.ConsumeNodeResources(&resources.Generic,
                    task.AssignedGenericResources)
            }
        }
    } else {
        resources = &api.Resources{}
    }

    if nodeInfoErr != nil {
        nodeInfo = newNodeInfo(n, nil, *resources)
    } else {
        nodeInfo.Node = n
        nodeInfo.AvailableResources = resources
    }
    s.nodeSet.addOrUpdateNode(nodeInfo)
}

func (s *Scheduler) processPreassignedTasks(ctx context.Context) {
    schedulingDecisions := make(map[string]schedulingDecision, len(s.pendingPreassignedTasks))
    for _, t := range s.pendingPreassignedTasks {
        newT := s.taskFitNode(ctx, t, t.NodeID)
        if newT == nil {
            continue
        }
        schedulingDecisions[t.ID] = schedulingDecision{old: t, new: newT}
    }

    successful, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)

    for _, decision := range successful {
        if decision.new.Status.State == api.TaskStateAssigned {
            delete(s.pendingPreassignedTasks, decision.old.ID)
        }
    }
    for _, decision := range failed {
        s.allTasks[decision.old.ID] = decision.old
        nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
        if err == nil && nodeInfo.removeTask(decision.new) {
            s.nodeSet.updateNode(nodeInfo)
        }

        for _, va := range decision.new.Volumes {
            s.volumes.releaseVolume(va.ID, decision.new.ID)
        }
    }
}

// tick attempts to schedule the queue.
func (s *Scheduler) tick(ctx context.Context) {
    type commonSpecKey struct {
        serviceID   string
        specVersion api.Version
    }
    tasksByCommonSpec := make(map[commonSpecKey]map[string]*api.Task)
    var oneOffTasks []*api.Task
    schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))

    for taskID, t := range s.unassignedTasks {
        if t == nil || t.NodeID != "" {
            // task deleted or already assigned
            delete(s.unassignedTasks, taskID)
            continue
        }

        // Group tasks with common specs
        if t.SpecVersion != nil {
            taskGroupKey := commonSpecKey{
                serviceID:   t.ServiceID,
                specVersion: *t.SpecVersion,
            }

            if tasksByCommonSpec[taskGroupKey] == nil {
                tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
            }
            tasksByCommonSpec[taskGroupKey][taskID] = t
        } else {
            // This task doesn't have a spec version. We have to
            // schedule it as a one-off.
            oneOffTasks = append(oneOffTasks, t)
        }
        delete(s.unassignedTasks, taskID)
    }

    for _, taskGroup := range tasksByCommonSpec {
        s.scheduleTaskGroup(ctx, taskGroup, schedulingDecisions)
    }
    for _, t := range oneOffTasks {
        s.scheduleTaskGroup(ctx, map[string]*api.Task{t.ID: t}, schedulingDecisions)
    }

    _, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
    for _, decision := range failed {
        s.allTasks[decision.old.ID] = decision.old

        nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
        if err == nil && nodeInfo.removeTask(decision.new) {
            s.nodeSet.updateNode(nodeInfo)
        }

        // release the volumes we tried to use
        for _, va := range decision.new.Volumes {
            s.volumes.releaseVolume(va.ID, decision.new.ID)
        }

        // enqueue task for next scheduling attempt
        s.enqueue(decision.old)
    }
}

func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDecisions map[string]schedulingDecision) (successful, failed []schedulingDecision) {
    // applySchedulingDecisions is the only place where we make store
    // transactions in the scheduler. the scheduler is responsible for freeing
    // volumes that are no longer in use. this means that volumes should be
    // freed in this function. sometimes, there are no scheduling decisions to
    // be made, so we return early in the if statement below.
    //
    // however, in all cases, any activity that results in a tick could result
    // in needing volumes to be freed, even if nothing new is scheduled. this
    // freeing of volumes should always happen *after* all of the scheduling
    // decisions have been committed, hence the defer.
    defer s.store.Batch(s.volumes.freeVolumes)

    if len(schedulingDecisions) == 0 {
        return
    }

    successful = make([]schedulingDecision, 0, len(schedulingDecisions))

    // Apply changes to master store
    err := s.store.Batch(func(batch *store.Batch) error {
        for len(schedulingDecisions) > 0 {
            err := batch.Update(func(tx store.Tx) error {
                // Update exactly one task inside this Update
                // callback.
            taskLoop:
                for taskID, decision := range schedulingDecisions {
                    delete(schedulingDecisions, taskID)

                    t := store.GetTask(tx, taskID)
                    if t == nil {
                        // Task no longer exists
                        s.deleteTask(decision.new)
                        continue
                    }

                    if t.Status.State == decision.new.Status.State &&
                        t.Status.Message == decision.new.Status.Message &&
                        t.Status.Err == decision.new.Status.Err {
                        // No changes, ignore
                        continue
                    }

                    if t.Status.State >= api.TaskStateAssigned {
                        nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
                        if err != nil {
                            failed = append(failed, decision)
                            continue
                        }
                        node := store.GetNode(tx, decision.new.NodeID)
                        if node == nil || node.Meta.Version != nodeInfo.Meta.Version {
                            // node is out of date
                            failed = append(failed, decision)
                            continue
                        }
                    }

                    volumes := []*api.Volume{}
                    for _, va := range decision.new.Volumes {
                        v := store.GetVolume(tx, va.ID)
                        if v == nil {
                            log.G(ctx).Debugf(
                                "scheduler failed to update task %s because volume %s could not be found",
                                taskID,
                                va.ID,
                            )
                            failed = append(failed, decision)
                            continue taskLoop
                        }

                        // it's ok if the copy of the Volume we scheduled off
                        // of is out of date, because the Scheduler is the only
                        // component which add new uses of a particular Volume,
                        // which means that in most cases, no update to the
                        // volume could conflict with the copy the Scheduler
                        // used to make decisions.
                        //
                        // the exception is that the VolumeAvailability could
                        // have been changed. both Pause and Drain
                        // availabilities mean the Volume should not be
                        // scheduled, and so we call off our attempt to commit
                        // this scheduling decision. this is the only field we
                        // must check for conflicts.
                        //
                        // this is, additionally, the reason that a Volume must
                        // be set to Drain before it can be deleted. it stops
                        // us from having to worry about any other field when
                        // attempting to use the Volume.
                        if v.Spec.Availability != api.VolumeAvailabilityActive {
                            log.G(ctx).Debugf(
                                "scheduler failed to update task %s because volume %s has availability %s",
                                taskID, v.ID, v.Spec.Availability.String(),
                            )
                            failed = append(failed, decision)
                            continue taskLoop
                        }

                        alreadyPublished := false
                        for _, ps := range v.PublishStatus {
                            if ps.NodeID == decision.new.NodeID {
                                alreadyPublished = true
                                break
                            }
                        }
                        if !alreadyPublished {
                            v.PublishStatus = append(
                                v.PublishStatus,
                                &api.VolumePublishStatus{
                                    NodeID: decision.new.NodeID,
                                    State:  api.VolumePublishStatus_PENDING_PUBLISH,
                                },
                            )
                            volumes = append(volumes, v)
                        }
                    }

                    if err := store.UpdateTask(tx, decision.new); err != nil {
                        log.G(ctx).Debugf("scheduler failed to update task %s; will retry", taskID)
                        failed = append(failed, decision)
                        continue
                    }
                    for _, v := range volumes {
                        if err := store.UpdateVolume(tx, v); err != nil {
                            // TODO(dperny): handle the case of a partial
                            // update?
                            log.G(ctx).WithError(err).Debugf(
                                "scheduler failed to update task %v; volume %v could not be updated",
                                taskID, v.ID,
                            )
                            failed = append(failed, decision)
                            continue taskLoop
                        }
                    }
                    successful = append(successful, decision)
                    return nil
                }
                return nil
            })
            if err != nil {
                return err
            }
        }
        // finally, every time we make new scheduling decisions, take the
        // opportunity to release volumes.
        return nil
    })

    if err != nil {
        log.G(ctx).WithError(err).Error("scheduler tick transaction failed")
        failed = append(failed, successful...)
        successful = nil
    }
    return
}

// taskFitNode checks if a node has enough resources to accommodate a task.
func (s *Scheduler) taskFitNode(ctx context.Context, t *api.Task, nodeID string) *api.Task {
    nodeInfo, err := s.nodeSet.nodeInfo(nodeID)
    if err != nil {
        // node does not exist in set (it may have been deleted)
        return nil
    }
    newT := *t
    s.pipeline.SetTask(t)
    if !s.pipeline.Process(&nodeInfo) {
        // this node cannot accommodate this task
        newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
        newT.Status.Err = s.pipeline.Explain()
        s.allTasks[t.ID] = &newT

        return &newT
    }

    // before doing all of the updating logic, get the volume attachments
    // for the task on this node. this should always succeed, because we
    // should already have filtered nodes based on volume availability, but
    // just in case we missed something and it doesn't, we have an error
    // case.
    attachments, err := s.volumes.chooseTaskVolumes(t, &nodeInfo)
    if err != nil {
        newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
        newT.Status.Err = err.Error()
        s.allTasks[t.ID] = &newT

        return &newT
    }

    newT.Volumes = attachments

    newT.Status = api.TaskStatus{
        State:     api.TaskStateAssigned,
        Timestamp: ptypes.MustTimestampProto(time.Now()),
        Message:   "scheduler confirmed task can run on preassigned node",
    }
    s.allTasks[t.ID] = &newT

    if nodeInfo.addTask(&newT) {
        s.nodeSet.updateNode(nodeInfo)
    }
    return &newT
}

// scheduleTaskGroup schedules a batch of tasks that are part of the same
// service and share the same version of the spec.
func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
    // Pick at task at random from taskGroup to use for constraint
    // evaluation. It doesn't matter which one we pick because all the
    // tasks in the group are equal in terms of the fields the constraint
    // filters consider.
    var t *api.Task
    for _, t = range taskGroup {
        break
    }

    s.pipeline.SetTask(t)

    now := time.Now()

    nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
        // If either node has at least maxFailures recent failures,
        // that's the deciding factor.
        recentFailuresA := a.countRecentFailures(now, t)
        recentFailuresB := b.countRecentFailures(now, t)

        if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
            if recentFailuresA > recentFailuresB {
                return false
            }
            if recentFailuresB > recentFailuresA {
                return true
            }
        }

        tasksByServiceA := a.ActiveTasksCountByService[t.ServiceID]
        tasksByServiceB := b.ActiveTasksCountByService[t.ServiceID]

        if tasksByServiceA < tasksByServiceB {
            return true
        }
        if tasksByServiceA > tasksByServiceB {
            return false
        }

        // Total number of tasks breaks ties.
        return a.ActiveTasksCount < b.ActiveTasksCount
    }

    var prefs []*api.PlacementPreference
    if t.Spec.Placement != nil {
        prefs = t.Spec.Placement.Preferences
    }

    tree := s.nodeSet.tree(t.ServiceID, prefs, len(taskGroup), s.pipeline.Process, nodeLess)

    s.scheduleNTasksOnSubtree(ctx, len(taskGroup), taskGroup, &tree, schedulingDecisions, nodeLess)
    if len(taskGroup) != 0 {
        s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
    }
}

// scheduleNTasksOnSubtree schedules a set of tasks with identical constraints
// onto a set of nodes, taking into account placement preferences.
//
// placement preferences are used to create a tree such that every branch
// represents one subset of nodes across which tasks should be spread.
//
// because of this tree structure, scheduleNTasksOnSubtree is a recursive
// function. If there are subtrees of the current tree, then we recurse. if we
// are at a leaf node, past which there are no subtrees, then we try to
// schedule a proportional number of tasks to the nodes of that branch.
//
//   - n is the number of tasks being scheduled on this subtree
//   - taskGroup is a set of tasks to schedule, taking the form of a map from the
//     task ID to the task object.
//   - tree is the decision tree we're scheduling on. this is, effectively, the
//     set of nodes that meet scheduling constraints. these nodes are arranged
//     into a tree so that placement preferences can be taken into account when
//     spreading tasks across nodes.
//   - schedulingDecisions is a set of the scheduling decisions already made for
//     this tree
//   - nodeLess is a comparator that chooses which of the two nodes is preferable
//     to schedule on.
func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGroup map[string]*api.Task, tree *decisionTree, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
    if tree.next == nil {
        nodes := tree.orderedNodes(s.pipeline.Process, nodeLess)
        if len(nodes) == 0 {
            return 0
        }

        return s.scheduleNTasksOnNodes(ctx, n, taskGroup, nodes, schedulingDecisions, nodeLess)
    }

    // Walk the tree and figure out how the tasks should be split at each
    // level.
    tasksScheduled := 0
    tasksInUsableBranches := tree.tasks
    var noRoom map[*decisionTree]struct{}

    // Try to make branches even until either all branches are
    // full, or all tasks have been scheduled.
    for tasksScheduled != n && len(noRoom) != len(tree.next) {
        desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
        remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))

        for _, subtree := range tree.next {
            if noRoom != nil {
                if _, ok := noRoom[subtree]; ok {
                    continue
                }
            }
            subtreeTasks := subtree.tasks
            if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
                tasksToAssign := desiredTasksPerBranch - subtreeTasks
                if remainder > 0 {
                    tasksToAssign++
                }
                res := s.scheduleNTasksOnSubtree(ctx, tasksToAssign, taskGroup, subtree, schedulingDecisions, nodeLess)
                if res < tasksToAssign {
                    if noRoom == nil {
                        noRoom = make(map[*decisionTree]struct{})
                    }
                    noRoom[subtree] = struct{}{}
                    tasksInUsableBranches -= subtreeTasks
                } else if remainder > 0 {
                    remainder--
                }
                tasksScheduled += res
            }
        }
    }

    return tasksScheduled
}

// scheduleNTasksOnNodes schedules some number of tasks on the set of provided
// nodes. The number of tasks being scheduled may be less than the total number
// of tasks, as the Nodes may be one branch of a tree used to spread tasks.
//
// returns the number of tasks actually scheduled to these nodes. this may be
// fewer than the number of tasks desired to be scheduled, if there are
// insufficient nodes to meet resource constraints.
//
//   - n is the number of tasks desired to be scheduled to this set of nodes
//   - taskGroup is the tasks desired to be scheduled, in the form of a map from
//     task ID to task object. this argument is mutated; tasks which have been
//     scheduled are removed from the map.
//   - nodes is the set of nodes to schedule to
//   - schedulingDecisions is the set of scheduling decisions that have been made
//     thus far, in the form of a map from task ID to the decision made.
//   - nodeLess is a simple comparator that chooses which of two nodes would be
//     preferable to schedule on.
func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
    tasksScheduled := 0
    failedConstraints := make(map[int]bool) // key is index in nodes slice
    nodeIter := 0
    nodeCount := len(nodes)
    for taskID, t := range taskGroup {
        // Skip tasks which were already scheduled because they ended
        // up in two groups at once.
        if _, exists := schedulingDecisions[taskID]; exists {
            continue
        }

        node := &nodes[nodeIter%nodeCount]
        // before doing all of the updating logic, get the volume attachments
        // for the task on this node. this should always succeed, because we
        // should already have filtered nodes based on volume availability, but
        // just in case we missed something and it doesn't, we have an error
        // case.
        attachments, err := s.volumes.chooseTaskVolumes(t, node)
        if err != nil {
            // TODO(dperny) if there's an error, then what? i'm frankly not
            // sure.
            log.G(ctx).WithField("task.id", t.ID).WithError(err).Error("could not find task volumes")
        }

        log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)
        // she turned me into a newT!
        newT := *t
        newT.Volumes = attachments
        newT.NodeID = node.ID
        s.volumes.reserveTaskVolumes(&newT)
        newT.Status = api.TaskStatus{
            State:     api.TaskStateAssigned,
            Timestamp: ptypes.MustTimestampProto(time.Now()),
            Message:   "scheduler assigned task to node",
        }
        s.allTasks[t.ID] = &newT

        // in each iteration of this loop, the node we choose will always be
        // one which meets constraints. at the end of each iteration, we
        // re-process nodes, allowing us to remove nodes which no longer meet
        // resource constraints.
        nodeInfo, err := s.nodeSet.nodeInfo(node.ID)
        if err == nil && nodeInfo.addTask(&newT) {
            s.nodeSet.updateNode(nodeInfo)
            nodes[nodeIter%nodeCount] = nodeInfo
        }

        schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
        delete(taskGroup, taskID)
        tasksScheduled++
        if tasksScheduled == n {
            return tasksScheduled
        }

        if nodeIter+1 < nodeCount {
            // First pass fills the nodes until they have the same
            // number of tasks from this service.
            nextNode := nodes[(nodeIter+1)%nodeCount]
            if nodeLess(&nextNode, &nodeInfo) {
                nodeIter++
            }
        } else {
            // In later passes, we just assign one task at a time
            // to each node that still meets the constraints.
            nodeIter++
        }

        origNodeIter := nodeIter
        for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
            failedConstraints[nodeIter%nodeCount] = true
            nodeIter++
            if nodeIter-origNodeIter == nodeCount {
                // None of the nodes meet the constraints anymore.
                return tasksScheduled
            }
        }
    }

    return tasksScheduled
}

// noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before
// updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks
func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
    explanation := s.pipeline.Explain()
    for _, t := range taskGroup {
        var service *api.Service
        s.store.View(func(tx store.ReadTx) {
            service = store.GetService(tx, t.ServiceID)
        })
        if service == nil {
            log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler")
            continue
        }

        log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")

        newT := *t
        newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
        sv := service.SpecVersion
        tv := newT.SpecVersion
        if sv != nil && tv != nil && sv.Index > tv.Index {
            log.G(ctx).WithField("task.id", t.ID).Debug(
                "task belongs to old revision of service",
            )
            if t.Status.State == api.TaskStatePending && t.DesiredState >= api.TaskStateShutdown {
                log.G(ctx).WithField("task.id", t.ID).Debug(
                    "task is desired shutdown, scheduler will go ahead and do so",
                )
                newT.Status.State = api.TaskStateShutdown
                newT.Status.Err = ""
            }
        } else {
            if explanation != "" {
                newT.Status.Err = "no suitable node (" + explanation + ")"
            } else {
                newT.Status.Err = "no suitable node"
            }

            // re-enqueue a task that should still be attempted
            s.enqueue(&newT)
        }

        s.allTasks[t.ID] = &newT
        schedulingDecisions[t.ID] = schedulingDecision{old: t, new: &newT}
    }
}

func (s *Scheduler) buildNodeSet(tx store.ReadTx, tasksByNode map[string]map[string]*api.Task) error {
    nodes, err := store.FindNodes(tx, store.All)
    if err != nil {
        return err
    }

    s.nodeSet.alloc(len(nodes))

    for _, n := range nodes {
        var resources api.Resources
        if n.Description != nil && n.Description.Resources != nil {
            resources = *n.Description.Resources
        }
        s.nodeSet.addOrUpdateNode(newNodeInfo(n, tasksByNode[n.ID], resources))
    }

    return nil
}