docker/swarmkit

View on GitHub
agent/testutils/fakes.go

Summary

Maintainability
A
0 mins
Test Coverage
package testutils

import (
    "context"
    "net"
    "os"
    "path/filepath"
    "sync"
    "testing"
    "time"

    "google.golang.org/grpc"

    "github.com/moby/swarmkit/v2/agent/exec"
    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/ca"
    "github.com/moby/swarmkit/v2/identity"
    "github.com/moby/swarmkit/v2/log"
    "github.com/stretchr/testify/require"
)

// TestExecutor is executor for integration tests
type TestExecutor struct {
    mu   sync.Mutex
    desc *api.NodeDescription
}

// Describe just returns empty NodeDescription.
func (e *TestExecutor) Describe(ctx context.Context) (*api.NodeDescription, error) {
    e.mu.Lock()
    defer e.mu.Unlock()
    if e.desc == nil {
        return &api.NodeDescription{}, nil
    }
    return e.desc.Copy(), nil
}

// Configure does nothing.
func (e *TestExecutor) Configure(ctx context.Context, node *api.Node) error {
    return nil
}

// SetNetworkBootstrapKeys does nothing.
func (e *TestExecutor) SetNetworkBootstrapKeys([]*api.EncryptionKey) error {
    return nil
}

// Controller returns TestController.
func (e *TestExecutor) Controller(t *api.Task) (exec.Controller, error) {
    return &TestController{
        ch: make(chan struct{}),
    }, nil
}

// UpdateNodeDescription sets the node description on the test executor
func (e *TestExecutor) UpdateNodeDescription(newDesc *api.NodeDescription) {
    e.mu.Lock()
    defer e.mu.Unlock()
    e.desc = newDesc
}

// TestController is dummy channel based controller for tests.
type TestController struct {
    ch        chan struct{}
    closeOnce sync.Once
}

// Update does nothing.
func (t *TestController) Update(ctx context.Context, task *api.Task) error {
    return nil
}

// Prepare does nothing.
func (t *TestController) Prepare(ctx context.Context) error {
    return nil
}

// Start does nothing.
func (t *TestController) Start(ctx context.Context) error {
    return nil
}

// Wait waits on internal channel.
func (t *TestController) Wait(ctx context.Context) error {
    select {
    case <-t.ch:
    case <-ctx.Done():
    }
    return nil
}

// Shutdown closes internal channel
func (t *TestController) Shutdown(ctx context.Context) error {
    t.closeOnce.Do(func() {
        close(t.ch)
    })
    return nil
}

// Terminate closes internal channel if it wasn't closed before.
func (t *TestController) Terminate(ctx context.Context) error {
    t.closeOnce.Do(func() {
        close(t.ch)
    })
    return nil
}

// Remove does nothing.
func (t *TestController) Remove(ctx context.Context) error {
    return nil
}

// Close does nothing.
func (t *TestController) Close() error {
    t.closeOnce.Do(func() {
        close(t.ch)
    })
    return nil
}

// SessionHandler is an injectable function that can be used handle session requests
type SessionHandler func(*api.SessionRequest, api.Dispatcher_SessionServer) error

// MockDispatcher is a fake dispatcher that one agent at a time can connect to
type MockDispatcher struct {
    mu             sync.Mutex
    sessionCh      chan *api.SessionMessage
    openSession    *api.SessionRequest
    closedSessions []*api.SessionRequest
    sessionHandler SessionHandler

    Addr string
}

// UpdateTaskStatus is not implemented
func (m *MockDispatcher) UpdateTaskStatus(context.Context, *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
    panic("not implemented")
}

func (m *MockDispatcher) UpdateVolumeStatus(context.Context, *api.UpdateVolumeStatusRequest) (*api.UpdateVolumeStatusResponse, error) {
    panic("not implemented")
}

// Tasks keeps an open stream until canceled
func (m *MockDispatcher) Tasks(_ *api.TasksRequest, stream api.Dispatcher_TasksServer) error {
    <-stream.Context().Done()
    return nil
}

// Assignments keeps an open stream until canceled
func (m *MockDispatcher) Assignments(_ *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
    <-stream.Context().Done()
    return nil
}

// Heartbeat always successfully heartbeats
func (m *MockDispatcher) Heartbeat(context.Context, *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
    return &api.HeartbeatResponse{Period: time.Second * 5}, nil
}

// Session allows a session to be established, and sends the node info
func (m *MockDispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
    m.mu.Lock()
    handler := m.sessionHandler
    m.openSession = r
    m.mu.Unlock()
    sessionID := identity.NewID()

    defer func() {
        m.mu.Lock()
        defer m.mu.Unlock()
        log.G(stream.Context()).Debugf("non-dispatcher side closed session: %s", sessionID)
        m.closedSessions = append(m.closedSessions, r)
        if m.openSession == r { // only overwrite session if it hasn't changed
            m.openSession = nil
        }
    }()

    if handler != nil {
        return handler(r, stream)
    }

    // send the initial message first
    if err := stream.Send(&api.SessionMessage{
        SessionID: sessionID,
        Managers: []*api.WeightedPeer{
            {
                Peer: &api.Peer{Addr: m.Addr},
            },
        },
    }); err != nil {
        return err
    }

    ctx := stream.Context()
    for {
        select {
        case msg := <-m.sessionCh:
            msg.SessionID = sessionID
            if err := stream.Send(msg); err != nil {
                return err
            }
        case <-ctx.Done():
            return nil
        }
    }
}

// GetSessions return all the established and closed sessions
func (m *MockDispatcher) GetSessions() (*api.SessionRequest, []*api.SessionRequest) {
    m.mu.Lock()
    defer m.mu.Unlock()
    return m.openSession, m.closedSessions
}

// SessionMessageChannel returns a writable channel to inject session messages
func (m *MockDispatcher) SessionMessageChannel() chan<- *api.SessionMessage {
    return m.sessionCh
}

// SetSessionHandler lets you inject a custom function to handle session requests
func (m *MockDispatcher) SetSessionHandler(s SessionHandler) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.sessionHandler = s
}

// NewMockDispatcher starts and returns a mock dispatcher instance that can be connected to
func NewMockDispatcher(t *testing.T, secConfig *ca.SecurityConfig, local bool) (*MockDispatcher, func()) {
    var (
        l       net.Listener
        err     error
        addr    string
        cleanup func()
    )
    if local {
        tempDir, err := os.MkdirTemp("", "local-dispatcher-socket")
        require.NoError(t, err)
        addr = filepath.Join(tempDir, "socket")
        l, err = net.Listen("unix", addr)
        require.NoError(t, err)
        cleanup = func() {
            os.RemoveAll(tempDir)
        }
    } else {
        l, err = net.Listen("tcp", "127.0.0.1:0")
        require.NoError(t, err)
        addr = l.Addr().String()
    }

    serverOpts := []grpc.ServerOption{grpc.Creds(secConfig.ServerTLSCreds)}
    s := grpc.NewServer(serverOpts...)

    m := &MockDispatcher{
        Addr:      addr,
        sessionCh: make(chan *api.SessionMessage, 1),
    }
    api.RegisterDispatcherServer(s, m)
    go s.Serve(l)
    return m, func() {
        l.Close()
        s.Stop()
        if cleanup != nil {
            cleanup()
        }
    }
}