dotcloud/docker

View on GitHub
pkg/plugins/plugins.go

Summary

Maintainability
A
0 mins
Test Coverage
// Package plugins provides structures and helper functions to manage Docker
// plugins.
//
// Docker discovers plugins by looking for them in the plugin directory whenever
// a user or container tries to use one by name. UNIX domain socket files must
// be located under /run/docker/plugins, whereas spec files can be located
// either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
// by the Registry interface, which lets you list all plugins or get a plugin by
// its name if it exists.
//
// The plugins need to implement an HTTP server and bind this to the UNIX socket
// or the address specified in the spec files.
// A handshake is send at /Plugin.Activate, and plugins are expected to return
// a Manifest with a list of Docker subsystems which this plugin implements.
//
// In order to use a plugins, you can use the `Get` with the name of the
// plugin and the subsystem it implements.
//
//    plugin, err := plugins.Get("example", "VolumeDriver")
//    if err != nil {
//        return fmt.Errorf("Error looking up volume plugin example: %v", err)
//    }
package plugins // import "github.com/docker/docker/pkg/plugins"

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"

    "github.com/containerd/log"
    "github.com/docker/go-connections/tlsconfig"
)

// ProtocolSchemeHTTPV1 is the name of the protocol used for interacting with plugins using this package.
const ProtocolSchemeHTTPV1 = "moby.plugins.http/v1"

// ErrNotImplements is returned if the plugin does not implement the requested driver.
var ErrNotImplements = errors.New("Plugin does not implement the requested driver")

type plugins struct {
    sync.Mutex
    plugins map[string]*Plugin
}

type extpointHandlers struct {
    sync.RWMutex
    extpointHandlers map[string][]func(string, *Client)
}

var (
    storage  = plugins{plugins: make(map[string]*Plugin)}
    handlers = extpointHandlers{extpointHandlers: make(map[string][]func(string, *Client))}
)

// Manifest lists what a plugin implements.
type Manifest struct {
    // List of subsystem the plugin implements.
    Implements []string
}

// Plugin is the definition of a docker plugin.
type Plugin struct {
    // Name of the plugin
    name string
    // Address of the plugin
    Addr string
    // TLS configuration of the plugin
    TLSConfig *tlsconfig.Options
    // Client attached to the plugin
    client *Client
    // Manifest of the plugin (see above)
    Manifest *Manifest `json:"-"`

    // wait for activation to finish
    activateWait *sync.Cond
    // error produced by activation
    activateErr error
    // keeps track of callback handlers run against this plugin
    handlersRun bool
}

// Name returns the name of the plugin.
func (p *Plugin) Name() string {
    return p.name
}

// Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
func (p *Plugin) Client() *Client {
    return p.client
}

// Protocol returns the protocol name/version used for plugins in this package.
func (p *Plugin) Protocol() string {
    return ProtocolSchemeHTTPV1
}

// IsV1 returns true for V1 plugins and false otherwise.
func (p *Plugin) IsV1() bool {
    return true
}

// ScopedPath returns the path scoped to the plugin's rootfs.
// For v1 plugins, this always returns the path unchanged as v1 plugins run directly on the host.
func (p *Plugin) ScopedPath(s string) string {
    return s
}

// NewLocalPlugin creates a new local plugin.
func NewLocalPlugin(name, addr string) *Plugin {
    return &Plugin{
        name: name,
        Addr: addr,
        // TODO: change to nil
        TLSConfig:    &tlsconfig.Options{InsecureSkipVerify: true},
        activateWait: sync.NewCond(&sync.Mutex{}),
    }
}

func (p *Plugin) activate() error {
    p.activateWait.L.Lock()

    if p.activated() {
        p.runHandlers()
        p.activateWait.L.Unlock()
        return p.activateErr
    }

    p.activateErr = p.activateWithLock()

    p.runHandlers()
    p.activateWait.L.Unlock()
    p.activateWait.Broadcast()
    return p.activateErr
}

// runHandlers runs the registered handlers for the implemented plugin types
// This should only be run after activation, and while the activation lock is held.
func (p *Plugin) runHandlers() {
    if !p.activated() {
        return
    }

    handlers.RLock()
    if !p.handlersRun {
        for _, iface := range p.Manifest.Implements {
            hdlrs, handled := handlers.extpointHandlers[iface]
            if !handled {
                continue
            }
            for _, handler := range hdlrs {
                handler(p.name, p.client)
            }
        }
        p.handlersRun = true
    }
    handlers.RUnlock()
}

// activated returns if the plugin has already been activated.
// This should only be called with the activation lock held
func (p *Plugin) activated() bool {
    return p.Manifest != nil
}

func (p *Plugin) activateWithLock() error {
    c, err := NewClient(p.Addr, p.TLSConfig)
    if err != nil {
        return err
    }
    p.client = c

    m := new(Manifest)
    if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
        return err
    }

    p.Manifest = m
    return nil
}

func (p *Plugin) waitActive() error {
    p.activateWait.L.Lock()
    for !p.activated() && p.activateErr == nil {
        p.activateWait.Wait()
    }
    p.activateWait.L.Unlock()
    return p.activateErr
}

func (p *Plugin) implements(kind string) bool {
    if p.Manifest == nil {
        return false
    }
    for _, driver := range p.Manifest.Implements {
        if driver == kind {
            return true
        }
    }
    return false
}

func loadWithRetry(name string, retry bool) (*Plugin, error) {
    registry := NewLocalRegistry()
    start := time.Now()
    var testTimeOut int
    if name == testNonExistingPlugin {
        // override the timeout in tests
        testTimeOut = 2
    }
    var retries int
    for {
        pl, err := registry.Plugin(name)
        if err != nil {
            if !retry {
                return nil, err
            }

            timeOff := backoff(retries)
            if abort(start, timeOff, testTimeOut) {
                return nil, err
            }
            retries++
            log.G(context.TODO()).Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
            time.Sleep(timeOff)
            continue
        }

        storage.Lock()
        if pl, exists := storage.plugins[name]; exists {
            storage.Unlock()
            return pl, pl.activate()
        }
        storage.plugins[name] = pl
        storage.Unlock()

        err = pl.activate()
        if err != nil {
            storage.Lock()
            delete(storage.plugins, name)
            storage.Unlock()
        }

        return pl, err
    }
}

func get(name string) (*Plugin, error) {
    storage.Lock()
    pl, ok := storage.plugins[name]
    storage.Unlock()
    if ok {
        return pl, pl.activate()
    }
    return loadWithRetry(name, true)
}

// Get returns the plugin given the specified name and requested implementation.
func Get(name, imp string) (*Plugin, error) {
    if name == "" {
        return nil, errors.New("Unable to find plugin without name")
    }
    pl, err := get(name)
    if err != nil {
        return nil, err
    }
    if err := pl.waitActive(); err == nil && pl.implements(imp) {
        log.G(context.TODO()).Debugf("%s implements: %s", name, imp)
        return pl, nil
    }
    return nil, fmt.Errorf("%w: plugin=%q, requested implementation=%q", ErrNotImplements, name, imp)
}

// Handle adds the specified function to the extpointHandlers.
func Handle(iface string, fn func(string, *Client)) {
    handlers.Lock()
    hdlrs, ok := handlers.extpointHandlers[iface]
    if !ok {
        hdlrs = []func(string, *Client){}
    }

    hdlrs = append(hdlrs, fn)
    handlers.extpointHandlers[iface] = hdlrs

    storage.Lock()
    for _, p := range storage.plugins {
        p.activateWait.L.Lock()
        if p.activated() && p.implements(iface) {
            p.handlersRun = false
        }
        p.activateWait.L.Unlock()
    }
    storage.Unlock()

    handlers.Unlock()
}

// GetAll returns all the plugins for the specified implementation
func (l *LocalRegistry) GetAll(imp string) ([]*Plugin, error) {
    pluginNames, err := l.Scan()
    if err != nil {
        return nil, err
    }

    type plLoad struct {
        pl  *Plugin
        err error
    }

    chPl := make(chan *plLoad, len(pluginNames))
    var wg sync.WaitGroup
    for _, name := range pluginNames {
        storage.Lock()
        pl, ok := storage.plugins[name]
        storage.Unlock()
        if ok {
            chPl <- &plLoad{pl, nil}
            continue
        }

        wg.Add(1)
        go func(name string) {
            defer wg.Done()
            pl, err := loadWithRetry(name, false)
            chPl <- &plLoad{pl, err}
        }(name)
    }

    wg.Wait()
    close(chPl)

    var out []*Plugin
    for pl := range chPl {
        if pl.err != nil {
            log.G(context.TODO()).Error(pl.err)
            continue
        }
        if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
            out = append(out, pl.pl)
        }
    }
    return out, nil
}