docker/swarmkit

View on GitHub
manager/orchestrator/replicated/replicated.go

Summary

Maintainability
A
35 mins
Test Coverage
package replicated

import (
    "context"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/manager/orchestrator/restart"
    "github.com/moby/swarmkit/v2/manager/orchestrator/update"
    "github.com/moby/swarmkit/v2/manager/state"
    "github.com/moby/swarmkit/v2/manager/state/store"
)

// An Orchestrator runs a reconciliation loop to create and destroy
// tasks as necessary for the replicated services.
type Orchestrator struct {
    store *store.MemoryStore

    reconcileServices map[string]*api.Service
    restartTasks      map[string]struct{}

    // stopChan signals to the state machine to stop running.
    stopChan chan struct{}
    // doneChan is closed when the state machine terminates.
    doneChan chan struct{}

    updater  *update.Supervisor
    restarts *restart.Supervisor

    cluster *api.Cluster // local cluster instance
}

// NewReplicatedOrchestrator creates a new replicated Orchestrator.
func NewReplicatedOrchestrator(store *store.MemoryStore) *Orchestrator {
    restartSupervisor := restart.NewSupervisor(store)
    updater := update.NewSupervisor(store, restartSupervisor)
    return &Orchestrator{
        store:             store,
        stopChan:          make(chan struct{}),
        doneChan:          make(chan struct{}),
        reconcileServices: make(map[string]*api.Service),
        restartTasks:      make(map[string]struct{}),
        updater:           updater,
        restarts:          restartSupervisor,
    }
}

// Run contains the orchestrator event loop. It runs until Stop is called.
func (r *Orchestrator) Run(ctx context.Context) error {
    defer close(r.doneChan)

    // Watch changes to services and tasks
    queue := r.store.WatchQueue()
    watcher, cancel := queue.Watch()
    defer cancel()

    // Balance existing services and drain initial tasks attached to invalid
    // nodes
    var err error
    r.store.View(func(readTx store.ReadTx) {
        if err = r.initTasks(ctx, readTx); err != nil {
            return
        }

        if err = r.initServices(readTx); err != nil {
            return
        }

        if err = r.initCluster(readTx); err != nil {
            return
        }
    })
    if err != nil {
        return err
    }

    r.tick(ctx)

    for {
        select {
        case event := <-watcher:
            // TODO(stevvooe): Use ctx to limit running time of operation.
            r.handleTaskEvent(ctx, event)
            r.handleServiceEvent(ctx, event)
            switch v := event.(type) {
            case state.EventCommit:
                r.tick(ctx)
            case api.EventUpdateCluster:
                r.cluster = v.Cluster
            }
        case <-r.stopChan:
            return nil
        }
    }
}

// Stop stops the orchestrator.
func (r *Orchestrator) Stop() {
    close(r.stopChan)
    <-r.doneChan
    r.updater.CancelAll()
    r.restarts.CancelAll()
}

func (r *Orchestrator) tick(ctx context.Context) {
    // tickTasks must be called first, so we respond to task-level changes
    // before performing service reconciliation.
    r.tickTasks(ctx)
    r.tickServices(ctx)
}