manager/allocator/network.go
package allocator
import (
"context"
"fmt"
"time"
"github.com/docker/go-events"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/manager/allocator/networkallocator"
"github.com/moby/swarmkit/v2/manager/state"
"github.com/moby/swarmkit/v2/manager/state/store"
"github.com/moby/swarmkit/v2/protobuf/ptypes"
"github.com/pkg/errors"
)
const (
// Network allocator Voter ID for task allocation vote.
networkVoter = "network"
allocatedStatusMessage = "pending task scheduling"
)
var (
// ErrNoIngress is returned when no ingress network is found in store
ErrNoIngress = errors.New("no ingress network found")
errNoChanges = errors.New("task unchanged")
retryInterval = 5 * time.Minute
)
// Network context information which is used throughout the network allocation code.
type networkContext struct {
ingressNetwork *api.Network
// Instance of the low-level network allocator which performs
// the actual network allocation.
nwkAllocator networkallocator.NetworkAllocator
// The port allocator instance for allocating node ports
portAllocator *portAllocator
// A set of tasks which are ready to be allocated as a batch. This is
// distinct from "unallocatedTasks" which are tasks that failed to
// allocate on the first try, being held for a future retry.
pendingTasks map[string]*api.Task
// A set of unallocated tasks which will be revisited if any thing
// changes in system state that might help task allocation.
unallocatedTasks map[string]*api.Task
// A set of unallocated services which will be revisited if
// any thing changes in system state that might help service
// allocation.
unallocatedServices map[string]*api.Service
// A set of unallocated networks which will be revisited if
// any thing changes in system state that might help network
// allocation.
unallocatedNetworks map[string]*api.Network
// lastRetry is the last timestamp when unallocated
// tasks/services/networks were retried.
lastRetry time.Time
// somethingWasDeallocated indicates that we just deallocated at
// least one service/task/network, so we should retry failed
// allocations (in we are experiencing IP exhaustion and an IP was
// released).
somethingWasDeallocated bool
}
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
nc := &networkContext{
nwkAllocator: a.nwkAllocator,
portAllocator: newPortAllocator(),
pendingTasks: make(map[string]*api.Task),
unallocatedTasks: make(map[string]*api.Task),
unallocatedServices: make(map[string]*api.Service),
unallocatedNetworks: make(map[string]*api.Network),
lastRetry: time.Now(),
}
a.netCtx = nc
defer func() {
// Clear a.netCtx if initialization was unsuccessful.
if err != nil {
a.netCtx = nil
}
}()
// Ingress network is now created at cluster's first time creation.
// Check if we have the ingress network. If found, make sure it is
// allocated, before reading all network objects for allocation.
// If not found, it means it was removed by user, nothing to do here.
ingressNetwork, err := GetIngressNetwork(a.store)
switch err {
case nil:
// Try to complete ingress network allocation before anything else so
// that the we can get the preferred subnet for ingress network.
nc.ingressNetwork = ingressNetwork
if !nc.nwkAllocator.IsAllocated(nc.ingressNetwork) {
if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
} else if err := a.store.Batch(func(batch *store.Batch) error {
if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
}
return nil
}); err != nil {
log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
}
}
case ErrNoIngress:
// Ingress network is not present in store, It means user removed it
// and did not create a new one.
default:
return errors.Wrap(err, "failure while looking for ingress network during init")
}
// First, allocate (read it as restore) objects likes network,nodes,serives
// and tasks that were already allocated. Then go on the allocate objects
// that are in raft and were previously not allocated. The reason being, during
// restore, we make sure that we populate the allocated states of
// the objects in the raft onto our in memory state.
if err := a.allocateNetworks(ctx, true); err != nil {
return err
}
if err := a.allocateNodes(ctx, true); err != nil {
return err
}
if err := a.allocateServices(ctx, true); err != nil {
return err
}
if err := a.allocateTasks(ctx, true); err != nil {
return err
}
// Now allocate objects that were not previously allocated
// but were present in the raft.
if err := a.allocateNetworks(ctx, false); err != nil {
return err
}
if err := a.allocateNodes(ctx, false); err != nil {
return err
}
if err := a.allocateServices(ctx, false); err != nil {
return err
}
return a.allocateTasks(ctx, false)
}
func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
nc := a.netCtx
switch v := ev.(type) {
case api.EventCreateNetwork:
n := v.Network.Copy()
if nc.nwkAllocator.IsAllocated(n) {
break
}
if IsIngressNetwork(n) && nc.ingressNetwork != nil {
log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations.Name)
break
}
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
break
}
if err := a.store.Batch(func(batch *store.Batch) error {
return a.commitAllocatedNetwork(ctx, batch, n)
}); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
}
if IsIngressNetwork(n) {
nc.ingressNetwork = n
}
case api.EventDeleteNetwork:
n := v.Network.Copy()
if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID {
nc.ingressNetwork = nil
}
if err := a.deallocateNodeAttachments(ctx, n.ID); err != nil {
log.G(ctx).WithError(err).Error(err)
}
// The assumption here is that all dependent objects
// have been cleaned up when we are here so the only
// thing that needs to happen is free the network
// resources.
if err := nc.nwkAllocator.Deallocate(n); err != nil {
log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
} else {
nc.somethingWasDeallocated = true
}
delete(nc.unallocatedNetworks, n.ID)
case api.EventCreateService:
var s *api.Service
a.store.View(func(tx store.ReadTx) {
s = store.GetService(tx, v.Service.ID)
})
if s == nil {
break
}
if nc.isServiceAllocated(s) {
break
}
if err := a.allocateService(ctx, s, false); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
break
}
if err := a.store.Batch(func(batch *store.Batch) error {
return a.commitAllocatedService(ctx, batch, s)
}); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
}
case api.EventUpdateService:
// We may have already allocated this service. If a create or
// update event is older than the current version in the store,
// we run the risk of allocating the service a second time.
// Only operate on the latest version of the service.
var s *api.Service
a.store.View(func(tx store.ReadTx) {
s = store.GetService(tx, v.Service.ID)
})
if s == nil {
break
}
if nc.isServiceAllocated(s) {
if !nc.portAllocator.hostPublishPortsNeedUpdate(s) {
break
}
updatePortsInHostPublishMode(s)
} else {
if err := a.allocateService(ctx, s, false); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
break
}
}
if err := a.store.Batch(func(batch *store.Batch) error {
return a.commitAllocatedService(ctx, batch, s)
}); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
nc.unallocatedServices[s.ID] = s
} else {
delete(nc.unallocatedServices, s.ID)
}
case api.EventDeleteService:
s := v.Service.Copy()
if err := nc.deallocateService(s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
} else {
nc.somethingWasDeallocated = true
}
// Remove it from unallocatedServices just in case
// it's still there.
delete(nc.unallocatedServices, s.ID)
case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode:
a.doNodeAlloc(ctx, ev)
case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask:
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procTasksNetwork(ctx, false)
if time.Since(nc.lastRetry) > retryInterval || nc.somethingWasDeallocated {
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procTasksNetwork(ctx, true)
nc.lastRetry = time.Now()
nc.somethingWasDeallocated = false
}
// Any left over tasks are moved to the unallocated set
for _, t := range nc.pendingTasks {
nc.unallocatedTasks[t.ID] = t
}
nc.pendingTasks = make(map[string]*api.Task)
}
}
func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
node *api.Node
)
// We may have already allocated this node. If a create or update
// event is older than the current version in the store, we run the
// risk of allocating the node a second time. Only operate on the
// latest version of the node.
switch v := ev.(type) {
case api.EventCreateNode:
a.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, v.Node.ID)
})
case api.EventUpdateNode:
a.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, v.Node.ID)
})
case api.EventDeleteNode:
isDelete = true
node = v.Node.Copy()
}
if node == nil {
return
}
nc := a.netCtx
if isDelete {
if err := a.deallocateNode(node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
} else {
nc.somethingWasDeallocated = true
}
} else {
// if this isn't a delete, we should try reallocating the node. if this
// is a creation, then the node will be allocated only for ingress.
if err := a.reallocateNode(ctx, node.ID); err != nil {
log.G(ctx).WithError(err).Errorf(
"error reallocating network resources for node %v", node.ID,
)
}
}
}
func isOverlayNetwork(n *api.Network) bool {
if n.DriverState != nil && n.DriverState.Name == "overlay" {
return true
}
if n.Spec.DriverConfig != nil && n.Spec.DriverConfig.Name == "overlay" {
return true
}
return false
}
//nolint:unused // TODO(thaJeztah) this is currently unused: is it safe to remove?
func (a *Allocator) getAllocatedNetworks() ([]*api.Network, error) {
var (
err error
nc = a.netCtx
na = nc.nwkAllocator
allocatedNetworks []*api.Network
)
// Find allocated networks
var networks []*api.Network
a.store.View(func(tx store.ReadTx) {
networks, err = store.FindNetworks(tx, store.All)
})
if err != nil {
return nil, errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
}
for _, n := range networks {
if isOverlayNetwork(n) && na.IsAllocated(n) {
allocatedNetworks = append(allocatedNetworks, n)
}
}
return allocatedNetworks, nil
}
// getNodeNetworks returns all networks that should be allocated for a node
func (a *Allocator) getNodeNetworks(nodeID string) ([]*api.Network, error) {
var (
// no need to initialize networks. we only append to it, and appending
// to a nil slice is valid. this has the added bonus of making this nil
// if we return an error
networks []*api.Network
err error
)
a.store.View(func(tx store.ReadTx) {
// get all tasks currently assigned to this node. it's no big deal if
// the tasks change in the meantime, there's no race to clean up
// unneeded network attachments on a node.
var tasks []*api.Task
tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID))
if err != nil {
return
}
// we need to keep track of network IDs that we've already added to the
// list of networks we're going to return. we could do
// map[string]*api.Network and then convert to []*api.Network and
// return that, but it seems cleaner to have a separate set and list.
networkIDs := map[string]struct{}{}
for _, task := range tasks {
// we don't need to check if a task is before the Assigned state.
// the only way we have a task with a NodeID that isn't yet in
// Assigned is if it's a global service task. this check is not
// necessary:
// if task.Status.State < api.TaskStateAssigned {
// continue
// }
if task.Status.State > api.TaskStateRunning {
// we don't need to have network attachments for a task that's
// already in a terminal state
continue
}
// now go through the task's network attachments and find all of
// the networks
for _, attachment := range task.Networks {
// if the network is an overlay network, and the network ID is
// not yet in the set of network IDs, then add it to the set
// and add the network to the list of networks we'll be
// returning
if _, ok := networkIDs[attachment.Network.ID]; isOverlayNetwork(attachment.Network) && !ok {
networkIDs[attachment.Network.ID] = struct{}{}
// we don't need to worry about retrieving the network from
// the store, because the network in the attachment is an
// identical copy of the network in the store.
networks = append(networks, attachment.Network)
}
}
}
})
// finally, we need the ingress network if one exists.
if a.netCtx != nil && a.netCtx.ingressNetwork != nil {
networks = append(networks, a.netCtx.ingressNetwork)
}
return networks, err
}
func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly bool) error {
// Allocate nodes in the store so far before we process watched events.
var (
allocatedNodes []*api.Node
nodes []*api.Node
err error
)
a.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.All)
})
if err != nil {
return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
}
for _, node := range nodes {
networks, err := a.getNodeNetworks(node.ID)
if err != nil {
return errors.Wrap(err, "error getting all networks needed by node")
}
isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, networks)
if isAllocated {
allocatedNodes = append(allocatedNodes, node)
}
}
if err := a.store.Batch(func(batch *store.Batch) error {
for _, node := range allocatedNodes {
if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
}
}
return nil
}); err != nil {
log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
}
return nil
}
//nolint:unused // TODO(thaJeztah) this is currently unused: is it safe to remove?
func (a *Allocator) deallocateNodes(ctx context.Context) error {
var (
nodes []*api.Node
nc = a.netCtx
err error
)
a.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.All)
})
if err != nil {
return fmt.Errorf("error listing all nodes in store while trying to free network resources")
}
for _, node := range nodes {
if err := a.deallocateNode(node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
} else {
nc.somethingWasDeallocated = true
}
if err := a.store.Batch(func(batch *store.Batch) error {
return a.commitAllocatedNode(ctx, batch, node)
}); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
}
}
return nil
}
func (a *Allocator) deallocateNodeAttachments(ctx context.Context, nid string) error {
var (
nodes []*api.Node
nc = a.netCtx
err error
)
a.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.All)
})
if err != nil {
return fmt.Errorf("error listing all nodes in store while trying to free network resources")
}
for _, node := range nodes {
var networkAttachment *api.NetworkAttachment
var naIndex int
for index, na := range node.Attachments {
if na.Network.ID == nid {
networkAttachment = na
naIndex = index
break
}
}
if networkAttachment == nil {
log.G(ctx).Errorf("Failed to find network %s on node %s", nid, node.ID)
continue
}
if nc.nwkAllocator.IsAttachmentAllocated(node, networkAttachment) {
if err := nc.nwkAllocator.DeallocateAttachment(node, networkAttachment); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
} else {
// Delete the lbattachment
node.Attachments[naIndex] = node.Attachments[len(node.Attachments)-1]
node.Attachments[len(node.Attachments)-1] = nil
node.Attachments = node.Attachments[:len(node.Attachments)-1]
if err := a.store.Batch(func(batch *store.Batch) error {
return a.commitAllocatedNode(ctx, batch, node)
}); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
}
}
}
}
return nil
}
func (a *Allocator) deallocateNode(node *api.Node) error {
var (
nc = a.netCtx
)
for _, na := range node.Attachments {
if nc.nwkAllocator.IsAttachmentAllocated(node, na) {
if err := nc.nwkAllocator.DeallocateAttachment(node, na); err != nil {
return err
}
}
}
node.Attachments = nil
return nil
}
// allocateNetworks allocates (restores) networks in the store so far before we process
// watched events. existingOnly flags is set to true to specify if only allocated
// networks need to be restored.
func (a *Allocator) allocateNetworks(ctx context.Context, existingOnly bool) error {
var (
nc = a.netCtx
networks []*api.Network
err error
)
a.store.View(func(tx store.ReadTx) {
networks, err = store.FindNetworks(tx, store.All)
})
if err != nil {
return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
}
var allocatedNetworks []*api.Network
for _, n := range networks {
if nc.nwkAllocator.IsAllocated(n) {
continue
}
// Network is considered allocated only if the DriverState and IPAM are NOT nil.
// During initial restore (existingOnly being true), check the network state in
// raft store. If it is allocated, then restore the same in the in memory allocator
// state. If it is not allocated, then skip allocating the network at this step.
// This is to avoid allocating an in-use network IP, subnet pool or vxlan id to
// another network.
if existingOnly &&
(n.DriverState == nil ||
n.IPAM == nil) {
continue
}
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithField("existingOnly", existingOnly).WithError(err).Errorf("failed allocating network %s during init", n.ID)
continue
}
allocatedNetworks = append(allocatedNetworks, n)
}
if err := a.store.Batch(func(batch *store.Batch) error {
for _, n := range allocatedNetworks {
if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
}
}
return nil
}); err != nil {
log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
}
return nil
}
// allocateServices allocates services in the store so far before we process
// watched events.
func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly bool) error {
var (
nc = a.netCtx
services []*api.Service
err error
)
a.store.View(func(tx store.ReadTx) {
services, err = store.FindServices(tx, store.All)
})
if err != nil {
return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
}
var allocatedServices []*api.Service
for _, s := range services {
if nc.isServiceAllocated(s, networkallocator.OnInit) {
continue
}
if existingAddressesOnly &&
(s.Endpoint == nil ||
len(s.Endpoint.VirtualIPs) == 0) {
continue
}
if err := a.allocateService(ctx, s, existingAddressesOnly); err != nil {
log.G(ctx).WithField("existingAddressesOnly", existingAddressesOnly).WithError(err).Errorf("failed allocating service %s during init", s.ID)
continue
}
allocatedServices = append(allocatedServices, s)
}
if err := a.store.Batch(func(batch *store.Batch) error {
for _, s := range allocatedServices {
if err := a.commitAllocatedService(ctx, batch, s); err != nil {
log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
}
}
return nil
}); err != nil {
for _, s := range allocatedServices {
log.G(ctx).WithError(err).Errorf("failed committing allocation of service %v during init", s.GetID())
}
}
return nil
}
// isServiceAllocated returns false if the passed service needs to have network resources allocated/updated.
func (nc *networkContext) isServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool {
if !nc.nwkAllocator.IsServiceAllocated(s, flags...) {
return false
}
var options networkallocator.ServiceAllocationOpts
for _, flag := range flags {
flag(&options)
}
if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
return nc.portAllocator.isPortsAllocatedOnInit(s, options.OnInit)
}
return true
}
// allocateTasks allocates tasks in the store so far before we started watching.
func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error {
var (
nc = a.netCtx
tasks []*api.Task
allocatedTasks []*api.Task
err error
)
a.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.All)
})
if err != nil {
return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
}
logger := log.G(ctx).WithField("method", "(*Allocator).allocateTasks")
for _, t := range tasks {
if t.Status.State > api.TaskStateRunning {
logger.Debugf("task %v is in allocated state: %v", t.GetID(), t.Status.State)
continue
}
if existingAddressesOnly {
hasAddresses := false
for _, nAttach := range t.Networks {
if len(nAttach.Addresses) != 0 {
hasAddresses = true
break
}
}
if !hasAddresses {
logger.Debugf("task %v has no attached addresses", t.GetID())
continue
}
}
var s *api.Service
if t.ServiceID != "" {
a.store.View(func(tx store.ReadTx) {
s = store.GetService(tx, t.ServiceID)
})
}
// Populate network attachments in the task
// based on service spec.
a.taskCreateNetworkAttachments(t, s)
if taskReadyForNetworkVote(t, s, nc) {
if t.Status.State >= api.TaskStatePending {
logger.Debugf("task %v is in allocated state: %v", t.GetID(), t.Status.State)
continue
}
if a.taskAllocateVote(networkVoter, t.ID) {
// If the task is not attached to any network, network
// allocators job is done. Immediately cast a vote so
// that the task can be moved to the PENDING state as
// soon as possible.
updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
allocatedTasks = append(allocatedTasks, t)
logger.Debugf("allocated task %v, state update %v", t.GetID(), api.TaskStatePending)
}
continue
}
err := a.allocateTask(ctx, t)
if err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
logger.WithError(err).Errorf("failed allocating task %s during init", t.ID)
nc.unallocatedTasks[t.ID] = t
}
}
if err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range allocatedTasks {
if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
logger.WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
}
}
return nil
}); err != nil {
for _, t := range allocatedTasks {
logger.WithError(err).Errorf("failed committing allocation of task %v during init", t.GetID())
}
}
return nil
}
// taskReadyForNetworkVote checks if the task is ready for a network
// vote to move it to PENDING state.
func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
// Task is ready for vote if the following is true:
//
// Task has no network attached or networks attached but all
// of them allocated AND Task's service has no endpoint or
// network configured or service endpoints have been
// allocated.
return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
(s == nil || nc.isServiceAllocated(s))
}
func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
for _, n := range networks {
networksCopy = append(networksCopy, n.Copy())
}
t.Networks = networksCopy
}
func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
t.Endpoint = endpoint.Copy()
}
// IsIngressNetworkNeeded checks whether the service requires the routing-mesh
func IsIngressNetworkNeeded(s *api.Service) bool {
return networkallocator.IsIngressNetworkNeeded(s)
}
func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
// If task network attachments have already been filled in no
// need to do anything else.
if len(t.Networks) != 0 {
return
}
var networks []*api.NetworkAttachment
if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil {
networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
}
a.store.View(func(tx store.ReadTx) {
// Always prefer NetworkAttachmentConfig in the TaskSpec
specNetworks := t.Spec.Networks
if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
specNetworks = s.Spec.Networks
}
for _, na := range specNetworks {
n := store.GetNetwork(tx, na.Target)
if n == nil {
continue
}
attachment := api.NetworkAttachment{Network: n}
attachment.Aliases = append(attachment.Aliases, na.Aliases...)
attachment.Addresses = append(attachment.Addresses, na.Addresses...)
attachment.DriverAttachmentOpts = na.DriverAttachmentOpts
networks = append(networks, &attachment)
}
})
taskUpdateNetworks(t, networks)
}
func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
t *api.Task
)
logger := log.G(ctx).WithField("method", "(*Allocator).doTaskAlloc")
// We may have already allocated this task. If a create or update
// event is older than the current version in the store, we run the
// risk of allocating the task a second time. Only operate on the
// latest version of the task.
switch v := ev.(type) {
case api.EventCreateTask:
a.store.View(func(tx store.ReadTx) {
t = store.GetTask(tx, v.Task.ID)
})
case api.EventUpdateTask:
a.store.View(func(tx store.ReadTx) {
t = store.GetTask(tx, v.Task.ID)
})
case api.EventDeleteTask:
isDelete = true
t = v.Task.Copy()
}
if t == nil {
return
}
nc := a.netCtx
// If the task has stopped running then we should free the network
// resources associated with the task right away.
if t.Status.State > api.TaskStateRunning || isDelete {
if nc.nwkAllocator.IsTaskAllocated(t) {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
logger.WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
} else {
nc.somethingWasDeallocated = true
}
}
// if we're deallocating the task, we also might need to deallocate the
// node's network attachment, if this is the last task on the node that
// needs it. we can do that by doing the same dance to reallocate a
// node
if err := a.reallocateNode(ctx, t.NodeID); err != nil {
logger.WithError(err).Errorf("error reallocating node %v", t.NodeID)
}
// Cleanup any task references that might exist
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
// if the task has a node ID, we should allocate an attachment for the node
// this happens if the task is in any non-terminal state.
if t.NodeID != "" && t.Status.State <= api.TaskStateRunning {
if err := a.reallocateNode(ctx, t.NodeID); err != nil {
// TODO(dperny): not entire sure what the error handling flow here
// should be... for now, just log and keep going
logger.WithError(err).Errorf("error reallocating node %v", t.NodeID)
}
}
// If we are already in allocated state, there is
// absolutely nothing else to do.
if t.Status.State >= api.TaskStatePending {
logger.Debugf("Task %s is already in allocated state %v", t.ID, t.Status.State)
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
var s *api.Service
if t.ServiceID != "" {
a.store.View(func(tx store.ReadTx) {
s = store.GetService(tx, t.ServiceID)
})
if s == nil {
// If the task is running it is not normal to
// not be able to find the associated
// service. If the task is not running (task
// is either dead or the desired state is set
// to dead) then the service may not be
// available in store. But we still need to
// cleanup network resources associated with
// the task.
if t.Status.State <= api.TaskStateRunning && !isDelete {
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
return
}
}
}
// Populate network attachments in the task
// based on service spec.
a.taskCreateNetworkAttachments(t, s)
nc.pendingTasks[t.ID] = t
log.G(ctx).Debugf("task %v was marked pending allocation", t.ID)
}
// allocateNode takes a context, a node, whether or not new allocations should
// be made, and the networks to allocate. it then makes sure an attachment is
// allocated for every network in the provided networks, allocating new
// attachments if existingAddressesOnly is false. it return true if something
// new was allocated or something was removed, or false otherwise.
//
// additionally, allocateNode will remove and free any attachments for networks
// not in the set of networks passed in.
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAddressesOnly bool, networks []*api.Network) bool {
var allocated bool
nc := a.netCtx
var nwIDs = make(map[string]struct{}, len(networks))
// go through all of the networks we've passed in
for _, network := range networks {
nwIDs[network.ID] = struct{}{}
// for each one, create space for an attachment. then, search through
// all of the attachments already on the node. if the attachment
// exists, then copy it to the node. if not, we'll allocate it below.
var lbAttachment *api.NetworkAttachment
for _, na := range node.Attachments {
if na.Network != nil && na.Network.ID == network.ID {
lbAttachment = na
break
}
}
if lbAttachment != nil {
if nc.nwkAllocator.IsAttachmentAllocated(node, lbAttachment) {
continue
}
}
if lbAttachment == nil {
// if we're restoring state, we should not add an attachment here.
if existingAddressesOnly {
continue
}
lbAttachment = &api.NetworkAttachment{}
node.Attachments = append(node.Attachments, lbAttachment)
}
if existingAddressesOnly && len(lbAttachment.Addresses) == 0 {
continue
}
lbAttachment.Network = network.Copy()
if err := a.netCtx.nwkAllocator.AllocateAttachment(node, lbAttachment); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
// TODO: Should we add a unallocatedNode and retry allocating resources like we do for network, tasks, services?
// right now, we will only retry allocating network resources for the node when the node is updated.
continue
}
allocated = true
}
// if we're only initializing existing addresses, we should stop here and
// not deallocate anything
if existingAddressesOnly {
return allocated
}
// now that we've allocated everything new, we have to remove things that
// do not belong. we have to do this last because we can easily roll back
// attachments we've allocated if something goes wrong by freeing them, but
// we can't roll back deallocating attachments by reacquiring them.
// we're using a trick to filter without allocating see the official go
// wiki on github:
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
attachments := node.Attachments[:0]
for _, attach := range node.Attachments {
if _, ok := nwIDs[attach.Network.ID]; ok {
// attachment belongs to one of the networks, so keep it
attachments = append(attachments, attach)
} else {
// free the attachment and remove it from the node's attachments by
// re-slicing
if err := a.netCtx.nwkAllocator.DeallocateAttachment(node, attach); err != nil {
// if deallocation fails, there's nothing we can do besides log
// an error and keep going
log.G(ctx).WithError(err).Errorf(
"error deallocating attachment for network %v on node %v",
attach.Network.ID, node.ID,
)
}
// strictly speaking, nothing was allocated, but something was
// deallocated and that counts.
allocated = true
// also, set the somethingWasDeallocated flag so the allocator
// knows that it can now try again.
a.netCtx.somethingWasDeallocated = true
}
}
node.Attachments = attachments
return allocated
}
func (a *Allocator) reallocateNode(ctx context.Context, nodeID string) error {
var (
node *api.Node
)
a.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, nodeID)
})
if node == nil {
return errors.Errorf("node %v cannot be found", nodeID)
}
networks, err := a.getNodeNetworks(node.ID)
if err != nil {
return errors.Wrapf(err, "error getting networks for node %v", nodeID)
}
if a.allocateNode(ctx, node, false, networks) {
// if something was allocated, commit the node
if err := a.store.Batch(func(batch *store.Batch) error {
return a.commitAllocatedNode(ctx, batch, node)
}); err != nil {
return errors.Wrapf(err, "error committing allocation for node %v", nodeID)
}
}
return nil
}
func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
if err := batch.Update(func(tx store.Tx) error {
err := store.UpdateNode(tx, node)
if err == store.ErrSequenceConflict {
storeNode := store.GetNode(tx, node.ID)
storeNode.Attachments = node.Attachments
err = store.UpdateNode(tx, storeNode)
}
return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
}); err != nil {
if err := a.deallocateNode(node); err != nil {
log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
}
return err
}
return nil
}
// This function prepares the service object for being updated when the change regards
// the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
// to the current ingress mode runtime state ports plus the newly configured publish mode ports,
// so that the service allocation invoked on this new service object will trigger the deallocation
// of any old publish mode port and allocation of any new one.
func updatePortsInHostPublishMode(s *api.Service) {
// First, remove all host-mode ports from s.Endpoint.Ports
if s.Endpoint != nil {
var portConfigs []*api.PortConfig
for _, portConfig := range s.Endpoint.Ports {
if portConfig.PublishMode != api.PublishModeHost {
portConfigs = append(portConfigs, portConfig)
}
}
s.Endpoint.Ports = portConfigs
}
// Add back all host-mode ports
if s.Spec.Endpoint != nil {
if s.Endpoint == nil {
s.Endpoint = &api.Endpoint{}
}
for _, portConfig := range s.Spec.Endpoint.Ports {
if portConfig.PublishMode == api.PublishModeHost {
s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
}
}
}
s.Endpoint.Spec = s.Spec.Endpoint.Copy()
}
// allocateService takes care to align the desired state with the spec passed
// the last parameter is true only during restart when the data is read from raft
// and used to build internal state
func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existingAddressesOnly bool) error {
nc := a.netCtx
if s.Spec.Endpoint != nil {
// service has user-defined endpoint
if s.Endpoint == nil {
// service currently has no allocated endpoint, need allocated.
s.Endpoint = &api.Endpoint{
Spec: s.Spec.Endpoint.Copy(),
}
}
// The service is trying to expose ports to the external
// world. Automatically attach the service to the ingress
// network only if it is not already done.
if IsIngressNetworkNeeded(s) {
if nc.ingressNetwork == nil {
return fmt.Errorf("ingress network is missing")
}
var found bool
for _, vip := range s.Endpoint.VirtualIPs {
if vip.NetworkID == nc.ingressNetwork.ID {
found = true
break
}
}
if !found {
s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
&api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
}
}
} else if s.Endpoint != nil && !existingAddressesOnly {
// if we are in the restart phase there is no reason to try to deallocate anything because the state
// is not there
// service has no user-defined endpoints while has already allocated network resources,
// need deallocated.
if err := nc.deallocateService(s); err != nil {
return err
}
nc.somethingWasDeallocated = true
}
if err := nc.allocateService(s); err != nil {
nc.unallocatedServices[s.ID] = s
return err
}
// If the service doesn't expose ports any more and if we have
// any lingering virtual IP references for ingress network
// clean them up here.
if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil {
if s.Endpoint != nil {
for i, vip := range s.Endpoint.VirtualIPs {
if vip.NetworkID == nc.ingressNetwork.ID {
n := len(s.Endpoint.VirtualIPs)
s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
break
}
}
}
}
return nil
}
func (nc *networkContext) allocateService(s *api.Service) error {
if err := nc.portAllocator.serviceAllocatePorts(s); err != nil {
return err
}
if err := nc.nwkAllocator.AllocateService(s); err != nil {
nc.portAllocator.serviceDeallocatePorts(s)
return err
}
return nil
}
func (nc *networkContext) deallocateService(s *api.Service) error {
if err := nc.nwkAllocator.DeallocateService(s); err != nil {
return err
}
nc.portAllocator.serviceDeallocatePorts(s)
return nil
}
func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
if err := batch.Update(func(tx store.Tx) error {
err := store.UpdateService(tx, s)
if err == store.ErrSequenceConflict {
storeService := store.GetService(tx, s.ID)
storeService.Endpoint = s.Endpoint
err = store.UpdateService(tx, storeService)
}
return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
}); err != nil {
if err := a.netCtx.deallocateService(s); err != nil {
log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
}
return err
}
return nil
}
func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
nc := a.netCtx
if err := nc.nwkAllocator.Allocate(n); err != nil {
nc.unallocatedNetworks[n.ID] = n
return err
}
return nil
}
func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
if err := batch.Update(func(tx store.Tx) error {
if err := store.UpdateNetwork(tx, n); err != nil {
return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
}
return nil
}); err != nil {
if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
}
return err
}
return nil
}
func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
taskUpdated := false
nc := a.netCtx
logger := log.G(ctx).WithField("method", "(*Allocator).allocateTask")
// We might be here even if a task allocation has already
// happened but wasn't successfully committed to store. In such
// cases skip allocation and go straight ahead to updating the
// store.
if !nc.nwkAllocator.IsTaskAllocated(t) {
a.store.View(func(tx store.ReadTx) {
if t.ServiceID != "" {
s := store.GetService(tx, t.ServiceID)
if s == nil {
err = fmt.Errorf("could not find service %s for task %s", t.ServiceID, t.GetID())
return
}
if !nc.isServiceAllocated(s) {
err = fmt.Errorf("service %s to which task %s belongs has pending allocations", s.ID, t.ID)
return
}
if s.Endpoint != nil {
taskUpdateEndpoint(t, s.Endpoint)
taskUpdated = true
}
}
for _, na := range t.Networks {
n := store.GetNetwork(tx, na.Network.ID)
if n == nil {
err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
return
}
if !nc.nwkAllocator.IsAllocated(n) {
err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
return
}
na.Network = n
}
if err = nc.nwkAllocator.AllocateTask(t); err != nil {
return
}
if nc.nwkAllocator.IsTaskAllocated(t) {
taskUpdated = true
}
})
if err != nil {
return err
}
}
// Update the network allocations and moving to
// PENDING state on top of the latest store state.
if a.taskAllocateVote(networkVoter, t.ID) {
if t.Status.State < api.TaskStatePending {
updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
logger.Debugf("allocated task %v, state update %v", t.GetID(), api.TaskStatePending)
taskUpdated = true
} else {
logger.Debugf("task %v, already in allocated state %v", t.GetID(), t.Status.State)
}
}
if !taskUpdated {
return errNoChanges
}
return nil
}
func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
retError := batch.Update(func(tx store.Tx) error {
err := store.UpdateTask(tx, t)
if err == store.ErrSequenceConflict {
storeTask := store.GetTask(tx, t.ID)
taskUpdateNetworks(storeTask, t.Networks)
taskUpdateEndpoint(storeTask, t.Endpoint)
if storeTask.Status.State < api.TaskStatePending {
storeTask.Status = t.Status
}
err = store.UpdateTask(tx, storeTask)
}
return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
})
if retError == nil {
log.G(ctx).Debugf("committed allocated task %v, state update %v", t.GetID(), t.Status)
}
return retError
}
func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
nc := a.netCtx
var allocatedNetworks []*api.Network
for _, n := range nc.unallocatedNetworks {
if !nc.nwkAllocator.IsAllocated(n) {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
continue
}
allocatedNetworks = append(allocatedNetworks, n)
}
}
if len(allocatedNetworks) == 0 {
return
}
err := a.store.Batch(func(batch *store.Batch) error {
for _, n := range allocatedNetworks {
if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
continue
}
delete(nc.unallocatedNetworks, n.ID)
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
// We optimistically removed these from nc.unallocatedNetworks
// above in anticipation of successfully committing the batch,
// but since the transaction has failed, we requeue them here.
for _, n := range allocatedNetworks {
nc.unallocatedNetworks[n.ID] = n
}
}
}
func (a *Allocator) procUnallocatedServices(ctx context.Context) {
nc := a.netCtx
var allocatedServices []*api.Service
for _, s := range nc.unallocatedServices {
if !nc.isServiceAllocated(s) {
if err := a.allocateService(ctx, s, false); err != nil {
log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
continue
}
allocatedServices = append(allocatedServices, s)
}
}
if len(allocatedServices) == 0 {
return
}
err := a.store.Batch(func(batch *store.Batch) error {
for _, s := range allocatedServices {
if err := a.commitAllocatedService(ctx, batch, s); err != nil {
log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
continue
}
delete(nc.unallocatedServices, s.ID)
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
// We optimistically removed these from nc.unallocatedServices
// above in anticipation of successfully committing the batch,
// but since the transaction has failed, we requeue them here.
for _, s := range allocatedServices {
nc.unallocatedServices[s.ID] = s
}
}
}
func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
nc := a.netCtx
quiet := false
toAllocate := nc.pendingTasks
if onRetry {
toAllocate = nc.unallocatedTasks
quiet = true
}
allocatedTasks := make([]*api.Task, 0, len(toAllocate))
for _, t := range toAllocate {
if err := a.allocateTask(ctx, t); err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
if quiet {
log.G(ctx).WithError(err).Debug("task allocation failure")
} else {
log.G(ctx).WithError(err).Error("task allocation failure")
}
}
}
if len(allocatedTasks) == 0 {
return
}
err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range allocatedTasks {
err := a.commitAllocatedTask(ctx, batch, t)
if err != nil {
log.G(ctx).WithField("method", "(*Allocator).procTasksNetwork").WithError(err).Errorf("allocation commit failure for task %s", t.GetID())
continue
}
delete(toAllocate, t.ID)
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
// We optimistically removed these from toAllocate above in
// anticipation of successfully committing the batch, but since
// the transaction has failed, we requeue them here.
for _, t := range allocatedTasks {
toAllocate[t.ID] = t
}
}
}
// updateTaskStatus sets TaskStatus and updates timestamp.
func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
t.Status = api.TaskStatus{
State: newStatus,
Message: message,
Timestamp: ptypes.MustTimestampProto(time.Now()),
}
}
// IsIngressNetwork returns whether the passed network is an ingress network.
func IsIngressNetwork(nw *api.Network) bool {
return networkallocator.IsIngressNetwork(nw)
}
// GetIngressNetwork fetches the ingress network from store.
// ErrNoIngress will be returned if the ingress network is not present,
// nil otherwise. In case of any other failure in accessing the store,
// the respective error will be reported as is.
func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
var (
networks []*api.Network
err error
)
s.View(func(tx store.ReadTx) {
networks, err = store.FindNetworks(tx, store.All)
})
if err != nil {
return nil, err
}
for _, n := range networks {
if IsIngressNetwork(n) {
return n, nil
}
}
return nil, ErrNoIngress
}