netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go

Summary

Maintainability
A
0 mins
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package dockerd

import (
    "context"
    "fmt"
    "log/slog"
    "net"
    "strconv"
    "strings"
    "time"

    "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
    "github.com/netdata/netdata/go/go.d.plugin/logger"
    "github.com/netdata/netdata/go/go.d.plugin/pkg/web"

    "github.com/docker/docker/api/types"
    typesContainer "github.com/docker/docker/api/types/container"
    docker "github.com/docker/docker/client"
    "github.com/ilyam8/hashstructure"
)

func NewDiscoverer(cfg Config) (*Discoverer, error) {
    tags, err := model.ParseTags(cfg.Tags)
    if err != nil {
        return nil, fmt.Errorf("parse tags: %v", err)
    }

    d := &Discoverer{
        Logger: logger.New().With(
            slog.String("component", "service discovery"),
            slog.String("discoverer", "docker"),
        ),
        cfgSource: cfg.Source,
        newDockerClient: func(addr string) (dockerClient, error) {
            return docker.NewClientWithOpts(docker.WithHost(addr))
        },
        addr:           docker.DefaultDockerHost,
        listInterval:   time.Second * 60,
        timeout:        time.Second * 2,
        seenTggSources: make(map[string]bool),
        started:        make(chan struct{}),
    }

    d.Tags().Merge(tags)

    if cfg.Timeout.Duration().Seconds() != 0 {
        d.timeout = cfg.Timeout.Duration()
    }
    if cfg.Address != "" {
        d.addr = cfg.Address
    }

    return d, nil
}

type Config struct {
    Source string

    Tags    string       `yaml:"tags"`
    Address string       `yaml:"address"`
    Timeout web.Duration `yaml:"timeout"`
}

type (
    Discoverer struct {
        *logger.Logger
        model.Base

        dockerClient    dockerClient
        newDockerClient func(addr string) (dockerClient, error)
        addr            string

        cfgSource string

        listInterval   time.Duration
        timeout        time.Duration
        seenTggSources map[string]bool // [targetGroup.Source]

        started chan struct{}
    }
    dockerClient interface {
        NegotiateAPIVersion(context.Context)
        ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error)
        Close() error
    }
)

func (d *Discoverer) String() string {
    return "sd:docker"
}

func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
    d.Info("instance is started")
    defer func() { d.cleanup(); d.Info("instance is stopped") }()

    close(d.started)

    if d.dockerClient == nil {
        client, err := d.newDockerClient(d.addr)
        if err != nil {
            d.Errorf("error on creating docker client: %v", err)
            return
        }
        d.dockerClient = client
    }

    d.dockerClient.NegotiateAPIVersion(ctx)

    if err := d.listContainers(ctx, in); err != nil {
        d.Error(err)
        return
    }

    tk := time.NewTicker(d.listInterval)
    defer tk.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-tk.C:
            if err := d.listContainers(ctx, in); err != nil {
                d.Warning(err)
            }
        }
    }
}

func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error {
    listCtx, cancel := context.WithTimeout(ctx, d.timeout)
    defer cancel()

    containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{})
    if err != nil {
        return err
    }

    var tggs []model.TargetGroup
    seen := make(map[string]bool)

    for _, cntr := range containers {
        if tgg := d.buildTargetGroup(cntr); tgg != nil {
            tggs = append(tggs, tgg)
            seen[tgg.Source()] = true
        }
    }

    for src := range d.seenTggSources {
        if !seen[src] {
            tggs = append(tggs, &targetGroup{source: src})
        }
    }
    d.seenTggSources = seen

    select {
    case <-ctx.Done():
    case in <- tggs:
    }

    return nil
}

func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup {
    if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 {
        return nil
    }

    tgg := &targetGroup{
        source: cntrSource(cntr),
    }
    if d.cfgSource != "" {
        tgg.source += fmt.Sprintf(",%s", d.cfgSource)
    }

    for netDriver, network := range cntr.NetworkSettings.Networks {
        // container with network mode host will be discovered by local-listeners
        for _, port := range cntr.Ports {
            tgt := &target{
                ID:            cntr.ID,
                Name:          strings.TrimPrefix(cntr.Names[0], "/"),
                Image:         cntr.Image,
                Command:       cntr.Command,
                Labels:        mapAny(cntr.Labels),
                PrivatePort:   strconv.Itoa(int(port.PrivatePort)),
                PublicPort:    strconv.Itoa(int(port.PublicPort)),
                PublicPortIP:  port.IP,
                PortProtocol:  port.Type,
                NetworkMode:   cntr.HostConfig.NetworkMode,
                NetworkDriver: netDriver,
                IPAddress:     network.IPAddress,
            }
            tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort)

            hash, err := calcHash(tgt)
            if err != nil {
                continue
            }

            tgt.hash = hash
            tgt.Tags().Merge(d.Tags())

            tgg.targets = append(tgg.targets, tgt)
        }
    }

    return tgg
}

func (d *Discoverer) cleanup() {
    if d.dockerClient != nil {
        _ = d.dockerClient.Close()
    }
}

func cntrSource(cntr types.Container) string {
    name := strings.TrimPrefix(cntr.Names[0], "/")
    return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image)
}

func calcHash(obj any) (uint64, error) {
    return hashstructure.Hash(obj, nil)
}

func mapAny(src map[string]string) map[string]any {
    if src == nil {
        return nil
    }
    m := make(map[string]any, len(src))
    for k, v := range src {
        m[k] = v
    }
    return m
}