docker/docker

View on GitHub
libnetwork/agent.go

Summary

Maintainability
D
2 days
Test Coverage
package libnetwork

//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto

import (
    "context"
    "encoding/json"
    "fmt"
    "net"
    "sort"
    "sync"

    "github.com/containerd/log"
    "github.com/docker/docker/libnetwork/cluster"
    "github.com/docker/docker/libnetwork/discoverapi"
    "github.com/docker/docker/libnetwork/driverapi"
    "github.com/docker/docker/libnetwork/networkdb"
    "github.com/docker/docker/libnetwork/scope"
    "github.com/docker/docker/libnetwork/types"
    "github.com/docker/go-events"
    "github.com/gogo/protobuf/proto"
)

const (
    subsysGossip = "networking:gossip"
    subsysIPSec  = "networking:ipsec"
    keyringSize  = 3
)

// ByTime implements sort.Interface for []*types.EncryptionKey based on
// the LamportTime field.
type ByTime []*types.EncryptionKey

func (b ByTime) Len() int           { return len(b) }
func (b ByTime) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }

type nwAgent struct {
    networkDB         *networkdb.NetworkDB
    bindAddr          net.IP
    advertiseAddr     string
    dataPathAddr      string
    coreCancelFuncs   []func()
    driverCancelFuncs map[string][]func()
    mu                sync.Mutex
}

func (a *nwAgent) dataPathAddress() string {
    a.mu.Lock()
    defer a.mu.Unlock()
    if a.dataPathAddr != "" {
        return a.dataPathAddr
    }
    return a.advertiseAddr
}

const libnetworkEPTable = "endpoint_table"

func getBindAddr(ifaceName string) (net.IP, error) {
    iface, err := net.InterfaceByName(ifaceName)
    if err != nil {
        return nil, fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
    }

    addrs, err := iface.Addrs()
    if err != nil {
        return nil, fmt.Errorf("failed to get interface addresses: %v", err)
    }

    for _, a := range addrs {
        addr, ok := a.(*net.IPNet)
        if !ok {
            continue
        }
        addrIP := addr.IP

        if addrIP.IsLinkLocalUnicast() {
            continue
        }

        return addrIP, nil
    }

    return nil, fmt.Errorf("failed to get bind address")
}

// resolveAddr resolves the given address, which can be one of, and
// parsed in the following order or priority:
//
// - a well-formed IP-address
// - a hostname
// - an interface-name
func resolveAddr(addrOrInterface string) (net.IP, error) {
    // Try and see if this is a valid IP address
    if ip := net.ParseIP(addrOrInterface); ip != nil {
        return ip, nil
    }

    // If not a valid IP address, it could be a hostname.
    addr, err := net.ResolveIPAddr("ip", addrOrInterface)
    if err != nil {
        // If hostname lookup failed, try to look for an interface with the given name.
        return getBindAddr(addrOrInterface)
    }
    return addr.IP, nil
}

func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
    drvEnc := discoverapi.DriverEncryptionUpdate{}

    agent := c.getAgent()
    if agent == nil {
        log.G(context.TODO()).Debug("Skipping key change as agent is nil")
        return nil
    }

    // Find the deleted key. If the deleted key was the primary key,
    // a new primary key should be set before removing if from keyring.
    c.mu.Lock()
    added := []byte{}
    deleted := []byte{}
    j := len(c.keys)
    for i := 0; i < j; {
        same := false
        for _, key := range keys {
            if same = key.LamportTime == c.keys[i].LamportTime; same {
                break
            }
        }
        if !same {
            cKey := c.keys[i]
            if cKey.Subsystem == subsysGossip {
                deleted = cKey.Key
            }

            if cKey.Subsystem == subsysIPSec {
                drvEnc.Prune = cKey.Key
                drvEnc.PruneTag = cKey.LamportTime
            }
            c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
            c.keys[j-1] = nil
            j--
        }
        i++
    }
    c.keys = c.keys[:j]

    // Find the new key and add it to the key ring
    for _, key := range keys {
        same := false
        for _, cKey := range c.keys {
            if same = cKey.LamportTime == key.LamportTime; same {
                break
            }
        }
        if !same {
            c.keys = append(c.keys, key)
            if key.Subsystem == subsysGossip {
                added = key.Key
            }

            if key.Subsystem == subsysIPSec {
                drvEnc.Key = key.Key
                drvEnc.Tag = key.LamportTime
            }
        }
    }
    c.mu.Unlock()

    if len(added) > 0 {
        agent.networkDB.SetKey(added)
    }

    key, _, err := c.getPrimaryKeyTag(subsysGossip)
    if err != nil {
        return err
    }
    agent.networkDB.SetPrimaryKey(key)

    key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
    if err != nil {
        return err
    }
    drvEnc.Primary = key
    drvEnc.PrimaryTag = tag

    if len(deleted) > 0 {
        agent.networkDB.RemoveKey(deleted)
    }

    c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
        dr, ok := driver.(discoverapi.Discover)
        if !ok {
            return false
        }
        if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil {
            log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err)
            // Attempt to reconfigure keys in case of a update failure
            // which can arise due to a mismatch of keys
            // if worker nodes get temporarily disconnected
            log.G(context.TODO()).Warnf("Reconfiguring datapath keys for  %s", name)
            drvCfgEnc := discoverapi.DriverEncryptionConfig{}
            drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
            err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
            if err != nil {
                log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
            }
        }
        return false
    })

    return nil
}

func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
    agent := c.getAgent()
    if agent != nil {
        // agent is already present, so there is no need initialize it again.
        return nil
    }

    bindAddr := clusterProvider.GetLocalAddress()
    advAddr := clusterProvider.GetAdvertiseAddress()
    dataAddr := clusterProvider.GetDataPathAddress()
    remoteList := clusterProvider.GetRemoteAddressList()
    remoteAddrList := make([]string, 0, len(remoteList))
    for _, remote := range remoteList {
        addr, _, _ := net.SplitHostPort(remote)
        remoteAddrList = append(remoteAddrList, addr)
    }

    listen := clusterProvider.GetListenAddress()
    listenAddr, _, _ := net.SplitHostPort(listen)

    log.G(context.TODO()).WithFields(log.Fields{
        "listen-addr":               listenAddr,
        "local-addr":                bindAddr,
        "advertise-addr":            advAddr,
        "data-path-addr":            dataAddr,
        "remote-addr-list":          remoteAddrList,
        "network-control-plane-mtu": c.Config().NetworkControlPlaneMTU,
    }).Info("Initializing Libnetwork Agent")
    if advAddr != "" {
        if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
            log.G(context.TODO()).WithError(err).Errorf("Error in agentInit")
            return err
        }
        c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
            if capability.ConnectivityScope == scope.Global {
                if d, ok := driver.(discoverapi.Discover); ok {
                    c.agentDriverNotify(d)
                }
            }
            return false
        })
    }

    if len(remoteAddrList) > 0 {
        if err := c.agentJoin(remoteAddrList); err != nil {
            log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background")
        }
    }

    return nil
}

// For a given subsystem getKeys sorts the keys by lamport time and returns
// slice of keys and lamport time which can used as a unique tag for the keys
func (c *Controller) getKeys(subsystem string) (keys [][]byte, tags []uint64) {
    c.mu.Lock()
    defer c.mu.Unlock()

    sort.Sort(ByTime(c.keys))

    keys = make([][]byte, 0, len(c.keys))
    tags = make([]uint64, 0, len(c.keys))
    for _, key := range c.keys {
        if key.Subsystem == subsystem {
            keys = append(keys, key.Key)
            tags = append(tags, key.LamportTime)
        }
    }

    if len(keys) > 1 {
        // TODO(thaJeztah): why are we swapping order here? This code was added in https://github.com/moby/libnetwork/commit/e83d68b7d1fd9c479120914024242238f791b4dc
        keys[0], keys[1] = keys[1], keys[0]
        tags[0], tags[1] = tags[1], tags[0]
    }
    return keys, tags
}

// getPrimaryKeyTag returns the primary key for a given subsystem from the
// list of sorted key and the associated tag
func (c *Controller) getPrimaryKeyTag(subsystem string) (key []byte, lamportTime uint64, _ error) {
    c.mu.Lock()
    defer c.mu.Unlock()
    sort.Sort(ByTime(c.keys))
    keys := make([]*types.EncryptionKey, 0, len(c.keys))
    for _, k := range c.keys {
        if k.Subsystem == subsystem {
            keys = append(keys, k)
        }
    }
    if len(keys) < 2 {
        return nil, 0, fmt.Errorf("no primary key found for %s subsystem: %d keys found on controller, expected at least 2", subsystem, len(keys))
    }
    return keys[1].Key, keys[1].LamportTime, nil
}

func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
    bindAddr, err := resolveAddr(bindAddrOrInterface)
    if err != nil {
        return err
    }

    keys, _ := c.getKeys(subsysGossip)

    netDBConf := networkdb.DefaultConfig()
    netDBConf.BindAddr = listenAddr
    netDBConf.AdvertiseAddr = advertiseAddr
    netDBConf.Keys = keys
    if c.Config().NetworkControlPlaneMTU != 0 {
        // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
        // To be on the safe side let's cut 100 bytes
        netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100)
        log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
            c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
    }
    nDB, err := networkdb.New(netDBConf)
    if err != nil {
        return err
    }

    // Register the diagnostic handlers
    nDB.RegisterDiagnosticHandlers(c.DiagnosticServer)

    var cancelList []func()
    ch, cancel := nDB.Watch(libnetworkEPTable, "")
    cancelList = append(cancelList, cancel)
    nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
    cancelList = append(cancelList, cancel)

    c.mu.Lock()
    c.agent = &nwAgent{
        networkDB:         nDB,
        bindAddr:          bindAddr,
        advertiseAddr:     advertiseAddr,
        dataPathAddr:      dataPathAddr,
        coreCancelFuncs:   cancelList,
        driverCancelFuncs: make(map[string][]func()),
    }
    c.mu.Unlock()

    go c.handleTableEvents(ch, c.handleEpTableEvent)
    go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)

    keys, tags := c.getKeys(subsysIPSec)
    c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
        if dr, ok := driver.(discoverapi.Discover); ok {
            if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
                Keys: keys,
                Tags: tags,
            }); err != nil {
                log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err)
            }
        }
        return false
    })

    c.WalkNetworks(joinCluster)

    return nil
}

func (c *Controller) agentJoin(remoteAddrList []string) error {
    agent := c.getAgent()
    if agent == nil {
        return nil
    }
    return agent.networkDB.Join(remoteAddrList)
}

func (c *Controller) agentDriverNotify(d discoverapi.Discover) {
    agent := c.getAgent()
    if agent == nil {
        return
    }

    if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
        Address:     agent.dataPathAddress(),
        BindAddress: agent.bindAddr.String(),
        Self:        true,
    }); err != nil {
        log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err)
    }

    keys, tags := c.getKeys(subsysIPSec)
    if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
        Keys: keys,
        Tags: tags,
    }); err != nil {
        log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err)
    }
}

func (c *Controller) agentClose() {
    // Acquire current agent instance and reset its pointer
    // then run closing functions
    c.mu.Lock()
    agent := c.agent
    c.agent = nil
    c.mu.Unlock()

    // when the agent is closed the cluster provider should be cleaned up
    c.SetClusterProvider(nil)

    if agent == nil {
        return
    }

    var cancelList []func()

    agent.mu.Lock()
    for _, cancelFuncs := range agent.driverCancelFuncs {
        cancelList = append(cancelList, cancelFuncs...)
    }

    // Add also the cancel functions for the network db
    cancelList = append(cancelList, agent.coreCancelFuncs...)
    agent.mu.Unlock()

    for _, cancel := range cancelList {
        cancel()
    }

    agent.networkDB.Close()
}

// Task has the backend container details
type Task struct {
    Name       string
    EndpointID string
    EndpointIP string
    Info       map[string]string
}

// ServiceInfo has service specific details along with the list of backend tasks
type ServiceInfo struct {
    VIP          string
    LocalLBIndex int
    Tasks        []Task
    Ports        []string
}

type epRecord struct {
    ep      EndpointRecord
    info    map[string]string
    lbIndex int
}

// Services returns a map of services keyed by the service name with the details
// of all the tasks that belong to the service. Applicable only in swarm mode.
func (n *Network) Services() map[string]ServiceInfo {
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }
    nwID := n.ID()
    d, err := n.driver(true)
    if err != nil {
        log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err)
        return nil
    }

    // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
    eps := make(map[string]epRecord)
    c := n.getController()
    for eid, value := range agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID) {
        var epRec EndpointRecord
        if err := proto.Unmarshal(value.Value, &epRec); err != nil {
            log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err)
            continue
        }
        eps[eid] = epRecord{
            ep:      epRec,
            lbIndex: c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts),
        }
    }

    // Walk through the driver's tables, have the driver decode the entries
    // and return the tuple {ep ID, value}. value is a string that coveys
    // relevant info about the endpoint.
    for _, table := range n.driverTables {
        if table.objType != driverapi.EndpointObject {
            continue
        }
        for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
            epID, info := d.DecodeTableEntry(table.name, key, value.Value)
            if ep, ok := eps[epID]; !ok {
                log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
            } else {
                ep.info = info
                eps[epID] = ep
            }
        }
    }

    // group the endpoints into a map keyed by the service name
    sinfo := make(map[string]ServiceInfo)
    for ep, epr := range eps {
        s, ok := sinfo[epr.ep.ServiceName]
        if !ok {
            s = ServiceInfo{
                VIP:          epr.ep.VirtualIP,
                LocalLBIndex: epr.lbIndex,
            }
        }
        if s.Ports == nil {
            ports := make([]string, 0, len(epr.ep.IngressPorts))
            for _, port := range epr.ep.IngressPorts {
                ports = append(ports, fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort))
            }
            s.Ports = ports
        }
        s.Tasks = append(s.Tasks, Task{
            Name:       epr.ep.Name,
            EndpointID: ep,
            EndpointIP: epr.ep.EndpointIP,
            Info:       epr.info,
        })
        sinfo[epr.ep.ServiceName] = s
    }
    return sinfo
}

// clusterAgent returns the cluster agent if the network is a swarm-scoped,
// multi-host network.
func (n *Network) clusterAgent() (agent *nwAgent, ok bool) {
    if n.scope != scope.Swarm || !n.driverIsMultihost() {
        return nil, false
    }
    a := n.getController().getAgent()
    return a, a != nil
}

func (n *Network) joinCluster() error {
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }
    return agent.networkDB.JoinNetwork(n.ID())
}

func (n *Network) leaveCluster() error {
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }
    return agent.networkDB.LeaveNetwork(n.ID())
}

func (ep *Endpoint) addDriverInfoToCluster() error {
    if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
        return nil
    }
    n := ep.getNetwork()
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }

    nwID := n.ID()
    for _, te := range ep.joinInfo.driverTableEntries {
        if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil {
            return err
        }
    }
    return nil
}

func (ep *Endpoint) deleteDriverInfoFromCluster() error {
    if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
        return nil
    }
    n := ep.getNetwork()
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }

    nwID := n.ID()
    for _, te := range ep.joinInfo.driverTableEntries {
        if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil {
            return err
        }
    }
    return nil
}

func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
    if len(ep.dnsNames) == 0 || ep.Iface() == nil || ep.Iface().Address() == nil {
        return nil
    }

    n := ep.getNetwork()
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }

    sb.service.Lock()
    defer sb.service.Unlock()
    log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())

    // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
    // This is to handle a race between the EnableService and the sbLeave
    // It is possible that the EnableService starts, fetches the list of the endpoints and
    // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
    // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
    // This check under the Service lock of the sandbox ensure the correct behavior.
    // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
    // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
    // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
    // removed from the list, in this situation the delete will bail out not finding any data to cleanup
    // and the add will bail out not finding the endpoint on the sandbox.
    if err := sb.GetEndpoint(ep.ID()); err == nil {
        log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
        return nil
    }

    dnsNames := ep.getDNSNames()
    primaryDNSName, dnsAliases := dnsNames[0], dnsNames[1:]

    var ingressPorts []*PortConfig
    if ep.svcID != "" {
        // This is a task part of a service
        // Gossip ingress ports only in ingress network.
        if n.ingress {
            ingressPorts = ep.ingressPorts
        }
        if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), primaryDNSName, ep.virtualIP, ingressPorts, ep.svcAliases, dnsAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
            return err
        }
    } else {
        // This is a container simply attached to an attachable network
        if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), primaryDNSName, dnsAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
            return err
        }
    }

    buf, err := proto.Marshal(&EndpointRecord{
        Name:            primaryDNSName,
        ServiceName:     ep.svcName,
        ServiceID:       ep.svcID,
        VirtualIP:       ep.virtualIP.String(),
        IngressPorts:    ingressPorts,
        Aliases:         ep.svcAliases,
        TaskAliases:     dnsAliases,
        EndpointIP:      ep.Iface().Address().IP.String(),
        ServiceDisabled: false,
    })
    if err != nil {
        return err
    }

    if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
        log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
        return err
    }

    log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())

    return nil
}

func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error {
    if len(ep.dnsNames) == 0 {
        return nil
    }

    n := ep.getNetwork()
    agent, ok := n.clusterAgent()
    if !ok {
        return nil
    }

    sb.service.Lock()
    defer sb.service.Unlock()
    log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

    // Avoid a race w/ with a container that aborts preemptively.  This would
    // get caught in disableServceInNetworkDB, but we check here to make the
    // nature of the condition more clear.
    // See comment in addServiceInfoToCluster()
    if err := sb.GetEndpoint(ep.ID()); err == nil {
        log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
        return nil
    }

    dnsNames := ep.getDNSNames()
    primaryDNSName, dnsAliases := dnsNames[0], dnsNames[1:]

    // First update the networkDB then locally
    if fullRemove {
        if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
            log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
        }
    } else {
        disableServiceInNetworkDB(agent, n, ep)
    }

    if ep.Iface() != nil && ep.Iface().Address() != nil {
        if ep.svcID != "" {
            // This is a task part of a service
            var ingressPorts []*PortConfig
            if n.ingress {
                ingressPorts = ep.ingressPorts
            }
            if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), primaryDNSName, ep.virtualIP, ingressPorts, ep.svcAliases, dnsAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
                return err
            }
        } else {
            // This is a container simply attached to an attachable network
            if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), primaryDNSName, dnsAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
                return err
            }
        }
    }

    log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())

    return nil
}

func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) {
    var epRec EndpointRecord

    log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())

    // Update existing record to indicate that the service is disabled
    inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
    if err != nil {
        log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
        return
    }
    // Should never fail
    if err := proto.Unmarshal(inBuf, &epRec); err != nil {
        log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
        return
    }
    epRec.ServiceDisabled = true
    // Should never fail
    outBuf, err := proto.Marshal(&epRec)
    if err != nil {
        log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
        return
    }
    // Send update to the whole cluster
    if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
        log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
    }
}

func (n *Network) addDriverWatches() {
    if len(n.driverTables) == 0 {
        return
    }
    agent, ok := n.clusterAgent()
    if !ok {
        return
    }

    c := n.getController()
    for _, table := range n.driverTables {
        ch, cancel := agent.networkDB.Watch(table.name, n.ID())
        agent.mu.Lock()
        agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
        agent.mu.Unlock()
        go c.handleTableEvents(ch, n.handleDriverTableEvent)
        d, err := n.driver(false)
        if err != nil {
            log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
            return
        }

        err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
            // skip the entries that are mark for deletion, this is safe because this function is
            // called at initialization time so there is no state to delete
            if nid == n.ID() && !deleted {
                d.EventNotify(driverapi.Create, nid, table.name, key, value)
            }
            return false
        })
        if err != nil {
            log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
        }
    }
}

func (n *Network) cancelDriverWatches() {
    agent, ok := n.clusterAgent()
    if !ok {
        return
    }

    agent.mu.Lock()
    cancelFuncs := agent.driverCancelFuncs[n.ID()]
    delete(agent.driverCancelFuncs, n.ID())
    agent.mu.Unlock()

    for _, cancel := range cancelFuncs {
        cancel()
    }
}

func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
    for {
        select {
        case ev := <-ch.C:
            fn(ev)
        case <-ch.Done():
            return
        }
    }
}

func (n *Network) handleDriverTableEvent(ev events.Event) {
    d, err := n.driver(false)
    if err != nil {
        log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
        return
    }

    var (
        etype driverapi.EventType
        tname string
        key   string
        value []byte
    )

    switch event := ev.(type) {
    case networkdb.CreateEvent:
        tname = event.Table
        key = event.Key
        value = event.Value
        etype = driverapi.Create
    case networkdb.DeleteEvent:
        tname = event.Table
        key = event.Key
        value = event.Value
        etype = driverapi.Delete
    case networkdb.UpdateEvent:
        tname = event.Table
        key = event.Key
        value = event.Value
        etype = driverapi.Delete
    }

    d.EventNotify(etype, n.ID(), tname, key, value)
}

func (c *Controller) handleNodeTableEvent(ev events.Event) {
    var (
        value    []byte
        isAdd    bool
        nodeAddr networkdb.NodeAddr
    )
    switch event := ev.(type) {
    case networkdb.CreateEvent:
        value = event.Value
        isAdd = true
    case networkdb.DeleteEvent:
        value = event.Value
    case networkdb.UpdateEvent:
        log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
    }

    err := json.Unmarshal(value, &nodeAddr)
    if err != nil {
        log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err)
        return
    }
    c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
}

func (c *Controller) handleEpTableEvent(ev events.Event) {
    var (
        nid   string
        eid   string
        value []byte
        epRec EndpointRecord
    )

    switch event := ev.(type) {
    case networkdb.CreateEvent:
        nid = event.NetworkID
        eid = event.Key
        value = event.Value
    case networkdb.DeleteEvent:
        nid = event.NetworkID
        eid = event.Key
        value = event.Value
    case networkdb.UpdateEvent:
        nid = event.NetworkID
        eid = event.Key
        value = event.Value
    default:
        log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
        return
    }

    err := proto.Unmarshal(value, &epRec)
    if err != nil {
        log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
        return
    }

    containerName := epRec.Name
    svcName := epRec.ServiceName
    svcID := epRec.ServiceID
    vip := net.ParseIP(epRec.VirtualIP)
    ip := net.ParseIP(epRec.EndpointIP)
    ingressPorts := epRec.IngressPorts
    serviceAliases := epRec.Aliases
    taskAliases := epRec.TaskAliases

    if containerName == "" || ip == nil {
        log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
        return
    }

    switch ev.(type) {
    case networkdb.CreateEvent:
        log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
        if svcID != "" {
            // This is a remote task part of a service
            if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
                log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
                return
            }
        } else {
            // This is a remote container simply attached to an attachable network
            if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
                log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
            }
        }

    case networkdb.DeleteEvent:
        log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
        if svcID != "" {
            // This is a remote task part of a service
            if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
                log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
                return
            }
        } else {
            // This is a remote container simply attached to an attachable network
            if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
                log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
            }
        }
    case networkdb.UpdateEvent:
        log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
        // We currently should only get these to inform us that an endpoint
        // is disabled.  Report if otherwise.
        if svcID == "" || !epRec.ServiceDisabled {
            log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
            return
        }
        // This is a remote task that is part of a service that is now disabled
        if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
            log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
            return
        }
    }
}