docker/swarmkit

View on GitHub
manager/csi/plugin.go

Summary

Maintainability
A
1 hr
Test Coverage
package csi

import (
    "context"
    "errors"
    "fmt"
    "net"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"

    "github.com/container-storage-interface/spec/lib/go/csi"
    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/internal/csi/capability"
    "github.com/moby/swarmkit/v2/log"
    mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
)

// Plugin is the interface for a CSI controller plugin.
//
// In this package, the word "plugin" is unfortunately overused. This
// particular "Plugin" is the interface used by volume Manager to interact with
// CSI controller plugins. It should not be confused with the "plugin" returned
// from the plugingetter interface, which is the interface that gives us the
// information we need to create this Plugin.
type Plugin interface {
    CreateVolume(context.Context, *api.Volume) (*api.VolumeInfo, error)
    DeleteVolume(context.Context, *api.Volume) error
    PublishVolume(context.Context, *api.Volume, string) (map[string]string, error)
    UnpublishVolume(context.Context, *api.Volume, string) error
    AddNode(swarmID, csiID string)
    RemoveNode(swarmID string)
    Addr() net.Addr
}

// plugin represents an individual CSI controller plugin
type plugin struct {
    // name is the name of the plugin, which is also the name used as the
    // Driver.Name field
    name string

    // socket is the unix socket to connect to this plugin at.
    socket string
    addr   net.Addr

    // provider is the SecretProvider, which allows retrieving secrets for CSI
    // calls.
    provider SecretProvider

    // cc is the grpc client connection
    // TODO(dperny): the client is never closed. it may be closed when it goes
    // out of scope, but this should be verified.
    cc *grpc.ClientConn
    // idClient is the identity service client
    idClient csi.IdentityClient
    // controllerClient is the controller service client
    controllerClient csi.ControllerClient

    // controller indicates that the plugin has controller capabilities.
    controller bool

    // publisher indicates that the controller plugin has
    // PUBLISH_UNPUBLISH_VOLUME capability.
    publisher bool

    // swarmToCSI maps a swarm node ID to the corresponding CSI node ID
    swarmToCSI map[string]string

    // csiToSwarm maps a CSI node ID back to the swarm node ID.
    csiToSwarm map[string]string
}

// NewPlugin creates a new Plugin object.
//
// NewPlugin takes both the CompatPlugin and the PluginAddr. These should be
// the same object. By taking both parts here, we can push off the work of
// assuring that the given plugin implements the PluginAddr interface without
// having to typecast in this constructor.
func NewPlugin(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin {
    return &plugin{
        name: p.Name(),
        // TODO(dperny): verify that we do not need to include the Network()
        // portion of the Addr.
        socket:     fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()),
        addr:       p.Addr(),
        provider:   provider,
        swarmToCSI: map[string]string{},
        csiToSwarm: map[string]string{},
    }
}

// connect is a private method that initializes a gRPC ClientConn and creates
// the IdentityClient and ControllerClient.
func (p *plugin) connect(ctx context.Context) error {
    cc, err := grpc.DialContext(ctx, p.socket, grpc.WithInsecure())
    if err != nil {
        return err
    }

    p.cc = cc

    // first, probe the plugin, to ensure that it exists and is ready to go
    idc := csi.NewIdentityClient(cc)
    p.idClient = idc

    // controllerClient may not do anything if the plugin does not support
    // the controller service, but it should not be an error to create it now
    // anyway
    p.controllerClient = csi.NewControllerClient(cc)

    return p.init(ctx)
}

// init checks uses the identity service to check the properties of the plugin,
// most importantly, its capabilities.
func (p *plugin) init(ctx context.Context) error {
    probe, err := p.idClient.Probe(ctx, &csi.ProbeRequest{})
    if err != nil {
        return err
    }

    if probe.Ready != nil && !probe.Ready.Value {
        return errors.New("plugin not ready")
    }

    resp, err := p.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
    if err != nil {
        return err
    }

    if resp == nil {
        return nil
    }

    for _, c := range resp.Capabilities {
        if sc := c.GetService(); sc != nil {
            switch sc.Type {
            case csi.PluginCapability_Service_CONTROLLER_SERVICE:
                p.controller = true
            }
        }
    }

    if p.controller {
        cCapResp, err := p.controllerClient.ControllerGetCapabilities(
            ctx, &csi.ControllerGetCapabilitiesRequest{},
        )
        if err != nil {
            return err
        }

        for _, c := range cCapResp.Capabilities {
            rpc := c.GetRpc()
            if rpc.Type == csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME {
                p.publisher = true
            }
        }
    }

    return nil
}

// CreateVolume wraps and abstracts the CSI CreateVolume logic and returns
// the volume info, or an error.
func (p *plugin) CreateVolume(ctx context.Context, v *api.Volume) (*api.VolumeInfo, error) {
    c, err := p.Client(ctx)
    if err != nil {
        return nil, err
    }

    if !p.controller {
        // TODO(dperny): come up with a scheme to handle headless plugins
        // TODO(dperny): handle plugins without create volume capabilities
        return &api.VolumeInfo{VolumeID: v.Spec.Annotations.Name}, nil
    }

    createVolumeRequest := p.makeCreateVolume(v)
    resp, err := c.CreateVolume(ctx, createVolumeRequest)
    if err != nil {
        return nil, err
    }

    return makeVolumeInfo(resp.Volume), nil
}

func (p *plugin) DeleteVolume(ctx context.Context, v *api.Volume) error {
    if v.VolumeInfo == nil {
        return errors.New("VolumeInfo must not be nil")
    }
    // we won't use a fancy createDeleteVolumeRequest method because the
    // request is simple enough to not bother with it
    secrets := p.makeSecrets(v)
    req := &csi.DeleteVolumeRequest{
        VolumeId: v.VolumeInfo.VolumeID,
        Secrets:  secrets,
    }
    c, err := p.Client(ctx)
    if err != nil {
        return err
    }
    // response from RPC intentionally left blank
    _, err = c.DeleteVolume(ctx, req)
    return err
}

// PublishVolume calls ControllerPublishVolume to publish the given Volume to
// the Node with the given swarmkit ID. It returns a map, which is the
// PublishContext for this Volume on this Node.
func (p *plugin) PublishVolume(ctx context.Context, v *api.Volume, nodeID string) (map[string]string, error) {
    if !p.publisher {
        return nil, nil
    }
    csiNodeID := p.swarmToCSI[nodeID]
    if csiNodeID == "" {
        log.L.Errorf("CSI node ID not found for given Swarm node ID. Plugin: %s , Swarm node ID: %s", p.name, nodeID)
        return nil, status.Error(codes.FailedPrecondition, "CSI node ID not found for given Swarm node ID")
    }

    req := p.makeControllerPublishVolumeRequest(v, nodeID)
    c, err := p.Client(ctx)
    if err != nil {
        return nil, err
    }
    resp, err := c.ControllerPublishVolume(ctx, req)

    if err != nil {
        return nil, err
    }
    return resp.PublishContext, nil
}

// UnpublishVolume calls ControllerUnpublishVolume to unpublish the given
// Volume from the Node with the given swarmkit ID. It returns an error if the
// unpublish does not succeed
func (p *plugin) UnpublishVolume(ctx context.Context, v *api.Volume, nodeID string) error {
    if !p.publisher {
        return nil
    }

    req := p.makeControllerUnpublishVolumeRequest(v, nodeID)
    c, err := p.Client(ctx)
    if err != nil {
        return err
    }

    // response of the RPC intentionally left blank
    _, err = c.ControllerUnpublishVolume(ctx, req)
    return err
}

// AddNode adds a mapping for a node's swarm ID to the ID provided by this CSI
// plugin. This allows future calls to the plugin to be done entirely in terms
// of the swarm node ID.
//
// The CSI node ID is provided by the node as part of the NodeDescription.
func (p *plugin) AddNode(swarmID, csiID string) {
    p.swarmToCSI[swarmID] = csiID
    p.csiToSwarm[csiID] = swarmID
}

// RemoveNode removes a node from this plugin's node mappings.
func (p *plugin) RemoveNode(swarmID string) {
    csiID := p.swarmToCSI[swarmID]
    delete(p.swarmToCSI, swarmID)
    delete(p.csiToSwarm, csiID)
}

// Client retrieves a csi.ControllerClient for this plugin
//
// If this is the first time client has been called and no client yet exists,
// it will initialize the gRPC connection to the remote plugin and create a new
// ControllerClient.
func (p *plugin) Client(ctx context.Context) (csi.ControllerClient, error) {
    if p.controllerClient == nil {
        if err := p.connect(ctx); err != nil {
            return nil, err
        }
    }
    return p.controllerClient, nil
}

// makeCreateVolume makes a csi.CreateVolumeRequest from the volume object and
// spec. it uses the Plugin's SecretProvider to retrieve relevant secrets.
func (p *plugin) makeCreateVolume(v *api.Volume) *csi.CreateVolumeRequest {
    secrets := p.makeSecrets(v)
    return &csi.CreateVolumeRequest{
        Name:       v.Spec.Annotations.Name,
        Parameters: v.Spec.Driver.Options,
        VolumeCapabilities: []*csi.VolumeCapability{
            capability.MakeCapability(v.Spec.AccessMode),
        },
        Secrets:                   secrets,
        AccessibilityRequirements: makeTopologyRequirement(v.Spec.AccessibilityRequirements),
        CapacityRange:             makeCapacityRange(v.Spec.CapacityRange),
    }
}

// makeSecrets uses the plugin's SecretProvider to make the secrets map to pass
// to CSI RPCs.
func (p *plugin) makeSecrets(v *api.Volume) map[string]string {
    secrets := map[string]string{}
    for _, vs := range v.Spec.Secrets {
        // a secret should never be nil, but check just to be sure
        if vs != nil {
            secret := p.provider.GetSecret(vs.Secret)
            if secret != nil {
                // TODO(dperny): return an error, but this should never happen,
                // as secrets should be validated at volume creation time
                secrets[vs.Key] = string(secret.Spec.Data)
            }
        }
    }
    return secrets
}

func (p *plugin) makeControllerPublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerPublishVolumeRequest {
    if v.VolumeInfo == nil {
        return nil
    }

    secrets := p.makeSecrets(v)
    capability := capability.MakeCapability(v.Spec.AccessMode)
    capability.AccessType = &csi.VolumeCapability_Mount{
        Mount: &csi.VolumeCapability_MountVolume{},
    }
    return &csi.ControllerPublishVolumeRequest{
        VolumeId:         v.VolumeInfo.VolumeID,
        NodeId:           p.swarmToCSI[nodeID],
        Secrets:          secrets,
        VolumeCapability: capability,
        VolumeContext:    v.VolumeInfo.VolumeContext,
    }
}

func (p *plugin) makeControllerUnpublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerUnpublishVolumeRequest {
    if v.VolumeInfo == nil {
        return nil
    }

    secrets := p.makeSecrets(v)
    return &csi.ControllerUnpublishVolumeRequest{
        VolumeId: v.VolumeInfo.VolumeID,
        NodeId:   p.swarmToCSI[nodeID],
        Secrets:  secrets,
    }
}

func (p *plugin) Addr() net.Addr {
    return p.addr
}