docker/swarmkit

View on GitHub
agent/reporter.go

Summary

Maintainability
A
1 hr
Test Coverage
package agent

import (
    "context"
    "reflect"
    "sync"

    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/log"
)

// StatusReporter receives updates to task status. Method may be called
// concurrently, so implementations should be goroutine-safe.
type StatusReporter interface {
    UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error
}

// Reporter receives update to both task and volume status.
type Reporter interface {
    StatusReporter
    ReportVolumeUnpublished(ctx context.Context, volumeID string) error
}

type statusReporterFunc func(ctx context.Context, taskID string, status *api.TaskStatus) error

func (fn statusReporterFunc) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
    return fn(ctx, taskID, status)
}

//nolint:unused // currently only used in tests.
type volumeReporterFunc func(ctx context.Context, volumeID string) error

//nolint:unused // currently only used in tests.
func (fn volumeReporterFunc) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
    return fn(ctx, volumeID)
}

//nolint:unused // currently only used in tests.
type statusReporterCombined struct {
    statusReporterFunc
    volumeReporterFunc
}

// statusReporter creates a reliable StatusReporter that will always succeed.
// It handles several tasks at once, ensuring all statuses are reported.
//
// The reporter will continue reporting the current status until it succeeds.
type statusReporter struct {
    reporter Reporter
    statuses map[string]*api.TaskStatus
    // volumes is a set of volumes which are to be reported unpublished.
    volumes map[string]struct{}
    mu      sync.Mutex
    cond    sync.Cond
    closed  bool
}

func newStatusReporter(ctx context.Context, upstream Reporter) *statusReporter {
    r := &statusReporter{
        reporter: upstream,
        statuses: make(map[string]*api.TaskStatus),
        volumes:  make(map[string]struct{}),
    }

    r.cond.L = &r.mu

    go r.run(ctx)
    return r
}

func (sr *statusReporter) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
    sr.mu.Lock()
    defer sr.mu.Unlock()

    current, ok := sr.statuses[taskID]
    if ok {
        if reflect.DeepEqual(current, status) {
            return nil
        }

        if current.State > status.State {
            return nil // ignore old updates
        }
    }
    sr.statuses[taskID] = status
    sr.cond.Signal()

    return nil
}

func (sr *statusReporter) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
    sr.mu.Lock()
    defer sr.mu.Unlock()

    sr.volumes[volumeID] = struct{}{}
    sr.cond.Signal()

    return nil
}

func (sr *statusReporter) Close() error {
    sr.mu.Lock()
    defer sr.mu.Unlock()

    sr.closed = true
    sr.cond.Signal()

    return nil
}

func (sr *statusReporter) run(ctx context.Context) {
    done := make(chan struct{})
    defer close(done)

    sr.mu.Lock() // released during wait, below.
    defer sr.mu.Unlock()

    go func() {
        select {
        case <-ctx.Done():
            sr.Close()
        case <-done:
            return
        }
    }()

    for {
        if len(sr.statuses) == 0 && len(sr.volumes) == 0 {
            sr.cond.Wait()
        }

        if sr.closed {
            // TODO(stevvooe): Add support here for waiting until all
            // statuses are flushed before shutting down.
            return
        }

        for taskID, status := range sr.statuses {
            delete(sr.statuses, taskID) // delete the entry, while trying to send.

            sr.mu.Unlock()
            err := sr.reporter.UpdateTaskStatus(ctx, taskID, status)
            sr.mu.Lock()

            // reporter might be closed during UpdateTaskStatus call
            if sr.closed {
                return
            }

            if err != nil {
                log.G(ctx).WithError(err).Error("status reporter failed to report status to agent")

                // place it back in the map, if not there, allowing us to pick
                // the value if a new one came in when we were sending the last
                // update.
                if _, ok := sr.statuses[taskID]; !ok {
                    sr.statuses[taskID] = status
                }
            }
        }

        for volumeID := range sr.volumes {
            delete(sr.volumes, volumeID)

            sr.mu.Unlock()
            err := sr.reporter.ReportVolumeUnpublished(ctx, volumeID)
            sr.mu.Lock()

            // reporter might be closed during ReportVolumeUnpublished call
            if sr.closed {
                return
            }

            if err != nil {
                log.G(ctx).WithError(err).Error("status reporter failed to report volume status to agent")
                sr.volumes[volumeID] = struct{}{}
            }
        }
    }
}