docker/swarmkit

View on GitHub
manager/orchestrator/task.go

Summary

Maintainability
A
35 mins
Test Coverage
package orchestrator

import (
    "reflect"
    "time"

    google_protobuf "github.com/gogo/protobuf/types"
    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/api/defaults"
    "github.com/moby/swarmkit/v2/identity"
    "github.com/moby/swarmkit/v2/manager/constraint"
    "github.com/moby/swarmkit/v2/protobuf/ptypes"
)

// NewTask creates a new task.
func NewTask(cluster *api.Cluster, service *api.Service, slot uint64, nodeID string) *api.Task {
    var logDriver *api.Driver
    if service.Spec.Task.LogDriver != nil {
        // use the log driver specific to the task, if we have it.
        logDriver = service.Spec.Task.LogDriver
    } else if cluster != nil {
        // pick up the cluster default, if available.
        logDriver = cluster.Spec.TaskDefaults.LogDriver // nil is okay here.
    }

    taskID := identity.NewID()
    task := api.Task{
        ID:                 taskID,
        ServiceAnnotations: service.Spec.Annotations,
        Spec:               service.Spec.Task,
        SpecVersion:        service.SpecVersion,
        ServiceID:          service.ID,
        Slot:               slot,
        Status: api.TaskStatus{
            State:     api.TaskStateNew,
            Timestamp: ptypes.MustTimestampProto(time.Now()),
            Message:   "created",
        },
        Endpoint: &api.Endpoint{
            Spec: service.Spec.Endpoint.Copy(),
        },
        DesiredState: api.TaskStateRunning,
        LogDriver:    logDriver,
    }

    // In global mode we also set the NodeID
    if nodeID != "" {
        task.NodeID = nodeID
    }

    return &task
}

// RestartCondition returns the restart condition to apply to this task.
func RestartCondition(task *api.Task) api.RestartPolicy_RestartCondition {
    restartCondition := defaults.Service.Task.Restart.Condition
    if task.Spec.Restart != nil {
        restartCondition = task.Spec.Restart.Condition
    }
    return restartCondition
}

// IsTaskDirty determines whether a task matches the given service's spec and
// if the given node satisfies the placement constraints.
// Returns false if the spec version didn't change,
// only the task placement constraints changed and the assigned node
// satisfies the new constraints, or the service task spec and the endpoint spec
// didn't change at all.
// Returns true otherwise.
// Note: for non-failed tasks with a container spec runtime that have already
// pulled the required image (i.e., current state is between READY and
// RUNNING inclusively), the value of the `PullOptions` is ignored.
func IsTaskDirty(s *api.Service, t *api.Task, n *api.Node) bool {
    // If the spec version matches, we know the task is not dirty. However,
    // if it does not match, that doesn't mean the task is dirty, since
    // only a portion of the spec is included in the comparison.
    if t.SpecVersion != nil && s.SpecVersion != nil && *s.SpecVersion == *t.SpecVersion {
        return false
    }

    // Make a deep copy of the service and task spec for the comparison.
    serviceTaskSpec := *s.Spec.Task.Copy()

    // Task is not dirty if the placement constraints alone changed
    // and the node currently assigned can satisfy the changed constraints.
    if IsTaskDirtyPlacementConstraintsOnly(serviceTaskSpec, t) && nodeMatches(s, n) {
        return false
    }

    // For non-failed tasks with a container spec runtime that have already
    // pulled the required image (i.e., current state is between READY and
    // RUNNING inclusively), ignore the value of the `PullOptions` field by
    // setting the copied service to have the same PullOptions value as the
    // task. A difference in only the `PullOptions` field should not cause
    // a running (or ready to run) task to be considered 'dirty' when we
    // handle updates.
    // See https://github.com/docker/swarmkit/issues/971
    currentState := t.Status.State
    // Ignore PullOpts if the task is desired to be in a "runnable" state
    // and its last known current state is between READY and RUNNING in
    // which case we know that the task either successfully pulled its
    // container image or didn't need to.
    ignorePullOpts := t.DesiredState <= api.TaskStateRunning &&
        currentState >= api.TaskStateReady &&
        currentState <= api.TaskStateRunning
    if ignorePullOpts && serviceTaskSpec.GetContainer() != nil && t.Spec.GetContainer() != nil {
        // Modify the service's container spec.
        serviceTaskSpec.GetContainer().PullOptions = t.Spec.GetContainer().PullOptions
    }

    return !reflect.DeepEqual(serviceTaskSpec, t.Spec) ||
        (t.Endpoint != nil && !reflect.DeepEqual(s.Spec.Endpoint, t.Endpoint.Spec))
}

// Checks if the current assigned node matches the Placement.Constraints
// specified in the task spec for Updater.newService.
func nodeMatches(s *api.Service, n *api.Node) bool {
    if n == nil {
        return false
    }

    constraints, _ := constraint.Parse(s.Spec.Task.Placement.Constraints)
    return constraint.NodeMatches(constraints, n)
}

// IsTaskDirtyPlacementConstraintsOnly checks if the Placement field alone
// in the spec has changed.
func IsTaskDirtyPlacementConstraintsOnly(serviceTaskSpec api.TaskSpec, t *api.Task) bool {
    // Compare the task placement constraints.
    if reflect.DeepEqual(serviceTaskSpec.Placement, t.Spec.Placement) {
        return false
    }

    // Update spec placement to only the fields
    // other than the placement constraints in the spec.
    serviceTaskSpec.Placement = t.Spec.Placement
    return reflect.DeepEqual(serviceTaskSpec, t.Spec)
}

// InvalidNode is true if the node is nil, down, or drained
func InvalidNode(n *api.Node) bool {
    return n == nil ||
        n.Status.State == api.NodeStatus_DOWN ||
        n.Spec.Availability == api.NodeAvailabilityDrain
}

func taskTimestamp(t *api.Task) *google_protobuf.Timestamp {
    if t.Status.AppliedAt != nil {
        return t.Status.AppliedAt
    }

    return t.Status.Timestamp
}

// TasksByTimestamp sorts tasks by applied timestamp if available, otherwise
// status timestamp.
type TasksByTimestamp []*api.Task

// Len implements the Len method for sorting.
func (t TasksByTimestamp) Len() int {
    return len(t)
}

// Swap implements the Swap method for sorting.
func (t TasksByTimestamp) Swap(i, j int) {
    t[i], t[j] = t[j], t[i]
}

// Less implements the Less method for sorting.
func (t TasksByTimestamp) Less(i, j int) bool {
    iTimestamp := taskTimestamp(t[i])
    jTimestamp := taskTimestamp(t[j])

    if iTimestamp == nil {
        return true
    }
    if jTimestamp == nil {
        return false
    }
    if iTimestamp.Seconds < jTimestamp.Seconds {
        return true
    }
    if iTimestamp.Seconds > jTimestamp.Seconds {
        return false
    }
    return iTimestamp.Nanos < jTimestamp.Nanos
}