
View on GitHub


3 hrs
Test Coverage
package plugin

import (



// SecretGetter is a reimplementation of the exec.SecretGetter interface in the
// scope of the plugin package. This avoids the needing to import exec into the
// plugin package.
type SecretGetter interface {
    Get(secretID string) (*api.Secret, error)

type NodePlugin interface {
    GetPublishedPath(volumeID string) string
    NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error)
    NodeStageVolume(ctx context.Context, req *api.VolumeAssignment) error
    NodeUnstageVolume(ctx context.Context, req *api.VolumeAssignment) error
    NodePublishVolume(ctx context.Context, req *api.VolumeAssignment) error
    NodeUnpublishVolume(ctx context.Context, req *api.VolumeAssignment) error

type volumePublishStatus struct {
    // stagingPath is staging path of volume
    stagingPath string

    // isPublished keeps track if the volume is published.
    isPublished bool

    // publishedPath is published path of volume
    publishedPath string

type nodePlugin struct {
    // name is the name of the plugin, which is used in the Driver.Name field.
    name string

    // socket is the path of the unix socket to connect to this plugin at
    socket string

    // scopePath gets the provided path relative to the plugin directory.
    scopePath func(s string) string

    // secrets is the SecretGetter to get volume secret data
    secrets SecretGetter

    // volumeMap is the map from volume ID to Volume. Will place a volume once it is staged,
    // remove it from the map for unstage.
    // TODO: Make this map persistent if the swarm node goes down
    volumeMap map[string]*volumePublishStatus

    // mu for volumeMap
    mu sync.RWMutex

    // staging indicates that the plugin has staging capabilities.
    staging bool

    // cc is the gRPC client connection
    cc *grpc.ClientConn

    // idClient is the CSI Identity Service client
    idClient csi.IdentityClient

    // nodeClient is the CSI Node Service client
    nodeClient csi.NodeClient

const (
    // TargetStagePath is the path within the plugin's scope that the volume is
    // to be staged. This does not need to be accessible or propagated outside
    // of the plugin rootfs.
    TargetStagePath string = "/data/staged"
    // TargetPublishPath is the path within the plugin's scope that the volume
    // is to be published. This needs to be the plugin's PropagatedMount.
    TargetPublishPath string = "/data/published"

func NewNodePlugin(name string, p plugin.AddrPlugin, secrets SecretGetter) NodePlugin {
    return newNodePlugin(name, p, secrets)

// newNodePlugin returns a raw nodePlugin object, not behind an interface. this
// is useful for testing.
func newNodePlugin(name string, p plugin.AddrPlugin, secrets SecretGetter) *nodePlugin {
    return &nodePlugin{
        name:      name,
        socket:    fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()),
        scopePath: p.ScopedPath,
        secrets:   secrets,
        volumeMap: map[string]*volumePublishStatus{},

// connect is a private method that sets up the identity client and node
// client from a grpc client. it exists separately so that testing code can
// substitute in fake clients without a grpc connection
func (np *nodePlugin) connect(ctx context.Context) error {
    // even though this is a unix socket, we must set WithInsecure or the
    // connection will not be allowed.
    cc, err := grpc.DialContext(ctx, np.socket, grpc.WithInsecure())
    if err != nil {
        return err
    } = cc
    // first, probe the plugin, to ensure that it exists and is ready to go
    idc := csi.NewIdentityClient(cc)
    np.idClient = idc

    np.nodeClient = csi.NewNodeClient(cc)

    return np.init(ctx)

func (np *nodePlugin) Client(ctx context.Context) (csi.NodeClient, error) {
    if np.nodeClient == nil {
        if err := np.connect(ctx); err != nil {
            return nil, err
    return np.nodeClient, nil

func (np *nodePlugin) init(ctx context.Context) error {
    probe, err := np.idClient.Probe(ctx, &csi.ProbeRequest{})
    if err != nil {
        return err
    if probe.Ready != nil && !probe.Ready.Value {
        return status.Error(codes.FailedPrecondition, "Plugin is not Ready")

    c, err := np.Client(ctx)
    if err != nil {
        return err

    resp, err := c.NodeGetCapabilities(ctx, &csi.NodeGetCapabilitiesRequest{})
    if err != nil {
        // TODO(ameyag): handle
        return err
    if resp == nil {
        return nil
    log.G(ctx).Debugf("plugin advertises %d capabilities", len(resp.Capabilities))
    for _, c := range resp.Capabilities {
        if rpc := c.GetRpc(); rpc != nil {
            log.G(ctx).Debugf("plugin has capability %s", rpc)
            switch rpc.Type {
            case csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME:
                np.staging = true

    return nil

// GetPublishedPath returns the path at which the provided volume ID is
// published. This path is provided in terms of absolute location on the host,
// not the location in the plugins' scope.
// Returns an empty string if the volume does not exist.
func (np *nodePlugin) GetPublishedPath(volumeID string) string {
    if volInfo, ok := np.volumeMap[volumeID]; ok {
        if volInfo.isPublished {
            return np.scopePath(volInfo.publishedPath)
    return ""

func (np *nodePlugin) NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error) {
    c, err := np.Client(ctx)
    if err != nil {
        return nil, err
    resp, err := c.NodeGetInfo(ctx, &csi.NodeGetInfoRequest{})
    if err != nil {
        return nil, err

    i := makeNodeInfo(resp)
    i.PluginName =
    return i, nil

func (np *nodePlugin) NodeStageVolume(ctx context.Context, req *api.VolumeAssignment) error {
    if !np.staging {
        return nil

    stagingTarget := stagePath(req)
    err := capability.CheckArguments(req)
    if err != nil {
        return err

    c, err := np.Client(ctx)
    if err != nil {
        return err

    _, err = c.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
        VolumeId:          req.VolumeID,
        StagingTargetPath: stagingTarget,
        Secrets:           np.makeSecrets(req),
        VolumeCapability:  capability.MakeCapability(req.AccessMode),
        VolumeContext:     req.VolumeContext,
        PublishContext:    req.PublishContext,

    if err != nil {
        return err

    v := &volumePublishStatus{
        stagingPath: stagingTarget,

    np.volumeMap[req.ID] = v

    log.G(ctx).Infof("volume staged to path %s", stagingTarget)
    return nil

func (np *nodePlugin) NodeUnstageVolume(ctx context.Context, req *api.VolumeAssignment) error {
    if !np.staging {
        return nil

    stagingTarget := stagePath(req)

    // Check arguments
    if len(req.VolumeID) == 0 {
        return status.Error(codes.FailedPrecondition, "VolumeID missing in request")

    c, err := np.Client(ctx)
    if err != nil {
        return err

    // we must unpublish before we unstage. verify here that the volume is not
    // published.
    if v, ok := np.volumeMap[req.ID]; ok {
        if v.isPublished {
            return status.Errorf(codes.FailedPrecondition, "Volume %s is not unpublished", req.ID)
        return nil

    _, err = c.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
        VolumeId:          req.VolumeID,
        StagingTargetPath: stagingTarget,
    if err != nil {
        return err

    // if the volume doesn't exist in the volumeMap, deleting has no effect.
    delete(np.volumeMap, req.ID)
    log.G(ctx).Info("volume unstaged")

    return nil

func (np *nodePlugin) NodePublishVolume(ctx context.Context, req *api.VolumeAssignment) error {
    err := capability.CheckArguments(req)
    if err != nil {
        return err

    publishTarget := publishPath(req)

    // Some volumes plugins require staging; we track this with a boolean, which
    // also implies a staging path in the path map. If the plugin is marked as
    // requiring staging but does not have a staging path in the map, that is an
    // error.
    var stagingPath string
    if vs, ok := np.volumeMap[req.ID]; ok {
        stagingPath = vs.stagingPath
    } else if np.staging {
        return status.Error(codes.FailedPrecondition, "volume requires staging but was not staged")

    c, err := np.Client(ctx)
    if err != nil {
        return err

    _, err = c.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
        VolumeId:          req.VolumeID,
        TargetPath:        publishTarget,
        StagingTargetPath: stagingPath,
        VolumeCapability:  capability.MakeCapability(req.AccessMode),
        Secrets:           np.makeSecrets(req),
        VolumeContext:     req.VolumeContext,
        PublishContext:    req.PublishContext,
    if err != nil {
        return err

    status, ok := np.volumeMap[req.ID]
    if !ok {
        status = &volumePublishStatus{}
        np.volumeMap[req.ID] = status

    status.isPublished = true
    status.publishedPath = publishTarget

    log.G(ctx).Infof("volume published to path %s", publishTarget)

    return nil

func (np *nodePlugin) NodeUnpublishVolume(ctx context.Context, req *api.VolumeAssignment) error {
    // Check arguments
    if len(req.VolumeID) == 0 {
        return status.Error(codes.InvalidArgument, "Volume ID missing in request")
    publishTarget := publishPath(req)

    c, err := np.Client(ctx)
    if err != nil {
        return err

    _, err = c.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
        VolumeId:   req.VolumeID,
        TargetPath: publishTarget,

    if err != nil {
        return err

    if v, ok := np.volumeMap[req.ID]; ok {
        v.publishedPath = ""
        v.isPublished = false
        return nil

    log.G(ctx).Info("volume unpublished")
    return nil

func (np *nodePlugin) makeSecrets(v *api.VolumeAssignment) map[string]string {
    // this should never happen, but program defensively.
    if v == nil {
        return nil

    secrets := make(map[string]string, len(v.Secrets))
    for _, secret := range v.Secrets {
        // TODO(dperny): handle error from Get
        value, _ := np.secrets.Get(secret.Secret)
        if value != nil {
            secrets[secret.Key] = string(value.Spec.Data)

    return secrets

// makeNodeInfo converts a csi.NodeGetInfoResponse object into a swarmkit NodeCSIInfo
// object.
func makeNodeInfo(csiNodeInfo *csi.NodeGetInfoResponse) *api.NodeCSIInfo {
    return &api.NodeCSIInfo{
        NodeID:            csiNodeInfo.NodeId,
        MaxVolumesPerNode: csiNodeInfo.MaxVolumesPerNode,

// stagePath returns the staging path for a given volume assignment
func stagePath(v *api.VolumeAssignment) string {
    // this really just exists so we use the same trick to determine staging
    // path across multiple methods and can't forget to change it in one place
    // but not another
    return filepath.Join(TargetStagePath, v.ID)

// publishPath returns the publishing path for a given volume assignment
func publishPath(v *api.VolumeAssignment) string {
    // ditto as stagePath
    return filepath.Join(TargetPublishPath, v.ID)