netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go

Summary

Maintainability
A
2 hrs
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package netlisteners

import (
    "bufio"
    "bytes"
    "context"
    "errors"
    "fmt"
    "log/slog"
    "net"
    "os"
    "os/exec"
    "path/filepath"
    "sort"
    "strconv"
    "strings"
    "time"

    "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
    "github.com/netdata/netdata/go/go.d.plugin/agent/executable"
    "github.com/netdata/netdata/go/go.d.plugin/logger"

    "github.com/ilyam8/hashstructure"
)

var (
    shortName = "net_listeners"
    fullName  = fmt.Sprintf("sd:%s", shortName)
)

func NewDiscoverer(cfg Config) (*Discoverer, error) {
    tags, err := model.ParseTags(cfg.Tags)
    if err != nil {
        return nil, fmt.Errorf("parse tags: %v", err)
    }

    dir := os.Getenv("NETDATA_PLUGINS_DIR")
    if dir == "" {
        dir = executable.Directory
    }
    if dir == "" {
        dir, _ = os.Getwd()
    }

    d := &Discoverer{
        Logger: logger.New().With(
            slog.String("component", "service discovery"),
            slog.String("discoverer", shortName),
        ),
        cfgSource: cfg.Source,
        ll: &localListenersExec{
            binPath: filepath.Join(dir, "local-listeners"),
            timeout: time.Second * 5,
        },
        interval:   time.Minute * 2,
        expiryTime: time.Minute * 10,
        cache:      make(map[uint64]*cacheItem),
        started:    make(chan struct{}),
    }

    d.Tags().Merge(tags)

    return d, nil
}

type Config struct {
    Source string `yaml:"-"`
    Tags   string `yaml:"tags"`
}

type (
    Discoverer struct {
        *logger.Logger
        model.Base

        cfgSource string

        interval time.Duration
        ll       localListeners

        expiryTime time.Duration
        cache      map[uint64]*cacheItem // [target.Hash]

        started chan struct{}
    }
    cacheItem struct {
        lastSeenTime time.Time
        tgt          model.Target
    }
    localListeners interface {
        discover(ctx context.Context) ([]byte, error)
    }
)

func (d *Discoverer) String() string {
    return fullName
}

func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
    d.Info("instance is started")
    defer func() { d.Info("instance is stopped") }()

    close(d.started)

    if err := d.discoverLocalListeners(ctx, in); err != nil {
        d.Error(err)
        return
    }

    tk := time.NewTicker(d.interval)
    defer tk.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-tk.C:
            if err := d.discoverLocalListeners(ctx, in); err != nil {
                d.Warning(err)
                return
            }
        }
    }
}

func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error {
    bs, err := d.ll.discover(ctx)
    if err != nil {
        if errors.Is(err, context.Canceled) {
            return nil
        }
        return err
    }

    tgts, err := d.parseLocalListeners(bs)
    if err != nil {
        return err
    }

    tggs := d.processTargets(tgts)

    select {
    case <-ctx.Done():
    case in <- tggs:
    }

    return nil
}

func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup {
    tgg := &targetGroup{
        provider: fullName,
        source:   fmt.Sprintf("discoverer=%s,host=localhost", shortName),
    }
    if d.cfgSource != "" {
        tgg.source += fmt.Sprintf(",%s", d.cfgSource)
    }

    if d.expiryTime.Milliseconds() == 0 {
        tgg.targets = tgts
        return []model.TargetGroup{tgg}
    }

    now := time.Now()

    for _, tgt := range tgts {
        hash := tgt.Hash()
        if _, ok := d.cache[hash]; !ok {
            d.cache[hash] = &cacheItem{tgt: tgt}
        }
        d.cache[hash].lastSeenTime = now
    }

    for k, v := range d.cache {
        if now.Sub(v.lastSeenTime) > d.expiryTime {
            delete(d.cache, k)
            continue
        }
        tgg.targets = append(tgg.targets, v.tgt)
    }

    return []model.TargetGroup{tgg}
}

func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
    const (
        local4 = "127.0.0.1"
        local6 = "::1"
    )

    var targets []target
    sc := bufio.NewScanner(bytes.NewReader(bs))

    for sc.Scan() {
        text := strings.TrimSpace(sc.Text())
        if text == "" {
            continue
        }

        // Protocol|IPAddress|Port|Cmdline
        parts := strings.SplitN(text, "|", 4)
        if len(parts) != 4 {
            return nil, fmt.Errorf("unexpected data: '%s'", text)
        }

        tgt := target{
            Protocol:  parts[0],
            IPAddress: parts[1],
            Port:      parts[2],
            Comm:      extractComm(parts[3]),
            Cmdline:   parts[3],
        }

        if tgt.Comm == "docker-proxy" {
            continue
        }

        if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") {
            tgt.IPAddress = local4
        } else if tgt.IPAddress == "::" {
            tgt.IPAddress = local6
        }

        tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port)

        hash, err := calcHash(tgt)
        if err != nil {
            continue
        }

        tgt.hash = hash
        tgt.Tags().Merge(d.Tags())

        targets = append(targets, tgt)
    }

    // order: TCP, TCP6, UDP, UDP6
    sort.Slice(targets, func(i, j int) bool {
        tgt1, tgt2 := targets[i], targets[j]
        if tgt1.Protocol != tgt2.Protocol {
            return tgt1.Protocol < tgt2.Protocol
        }

        p1, _ := strconv.Atoi(targets[i].Port)
        p2, _ := strconv.Atoi(targets[j].Port)
        if p1 != p2 {
            return p1 < p2
        }

        return tgt1.IPAddress == local4 || tgt1.IPAddress == local6
    })

    seen := make(map[string]bool)
    tgts := make([]model.Target, len(targets))
    var n int

    for _, tgt := range targets {
        tgt := tgt

        proto := strings.TrimSuffix(tgt.Protocol, "6")
        key := tgt.Protocol + ":" + tgt.Address
        keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port)
        keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port)

        // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface  (a.b.c.d).
        // Create a target only for localhost. Assumption: any address always goes first.
        if seen[key] || seen[keyLocal4] || seen[keyLocal6] {
            continue
        }
        seen[key] = true

        tgts[n] = &tgt
        n++
    }

    return tgts[:n], nil
}

type localListenersExec struct {
    binPath string
    timeout time.Duration
}

func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) {
    execCtx, cancel := context.WithTimeout(ctx, e.timeout)
    defer cancel()

    // TCPv4/6 and UPDv4 sockets in LISTEN state
    // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c
    args := []string{
        "no-udp6",
        "no-local",
        "no-inbound",
        "no-outbound",
        "no-namespaces",
    }

    cmd := exec.CommandContext(execCtx, e.binPath, args...)

    bs, err := cmd.Output()
    if err != nil {
        return nil, fmt.Errorf("error on executing '%s': %v", cmd, err)
    }

    return bs, nil
}

func extractComm(cmdLine string) string {
    i := strings.IndexByte(cmdLine, ' ')
    if i <= 0 {
        return cmdLine
    }
    _, comm := filepath.Split(cmdLine[:i])
    return comm
}

func calcHash(obj any) (uint64, error) {
    return hashstructure.Hash(obj, nil)
}