netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go

Summary

Maintainability
A
0 mins
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package sd

import (
    "context"
    "fmt"
    "log/slog"
    "sync"

    "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
    "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline"
    "github.com/netdata/netdata/go/go.d.plugin/logger"
    "github.com/netdata/netdata/go/go.d.plugin/pkg/multipath"

    "gopkg.in/yaml.v2"
)

type Config struct {
    ConfigDefaults confgroup.Registry
    ConfDir        multipath.MultiPath
}

func NewServiceDiscovery(cfg Config) (*ServiceDiscovery, error) {
    log := logger.New().With(
        slog.String("component", "service discovery"),
    )

    d := &ServiceDiscovery{
        Logger:         log,
        confProv:       newConfFileReader(log, cfg.ConfDir),
        configDefaults: cfg.ConfigDefaults,
        newPipeline: func(config pipeline.Config) (sdPipeline, error) {
            return pipeline.New(config)
        },
        pipelines: make(map[string]func()),
    }

    return d, nil
}

type (
    ServiceDiscovery struct {
        *logger.Logger

        confProv confFileProvider

        configDefaults confgroup.Registry
        newPipeline    func(config pipeline.Config) (sdPipeline, error)
        pipelines      map[string]func()
    }
    sdPipeline interface {
        Run(ctx context.Context, in chan<- []*confgroup.Group)
    }
    confFileProvider interface {
        run(ctx context.Context)
        configs() chan confFile
    }
)

func (d *ServiceDiscovery) String() string {
    return "service discovery"
}

func (d *ServiceDiscovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
    d.Info("instance is started")
    defer func() { d.cleanup(); d.Info("instance is stopped") }()

    var wg sync.WaitGroup

    wg.Add(1)
    go func() { defer wg.Done(); d.confProv.run(ctx) }()

    wg.Add(1)
    go func() { defer wg.Done(); d.run(ctx, in) }()

    wg.Wait()
    <-ctx.Done()
}

func (d *ServiceDiscovery) run(ctx context.Context, in chan<- []*confgroup.Group) {
    for {
        select {
        case <-ctx.Done():
            return
        case cfg := <-d.confProv.configs():
            if cfg.source == "" {
                continue
            }
            if len(cfg.content) == 0 {
                d.removePipeline(cfg)
            } else {
                d.addPipeline(ctx, cfg, in)
            }
        }
    }
}

func (d *ServiceDiscovery) removePipeline(conf confFile) {
    if stop, ok := d.pipelines[conf.source]; ok {
        d.Infof("received an empty config, stopping the pipeline ('%s')", conf.source)
        delete(d.pipelines, conf.source)
        stop()
    }
}

func (d *ServiceDiscovery) addPipeline(ctx context.Context, conf confFile, in chan<- []*confgroup.Group) {
    var cfg pipeline.Config

    if err := yaml.Unmarshal(conf.content, &cfg); err != nil {
        d.Error(err)
        return
    }

    if cfg.Disabled {
        d.Infof("pipeline config is disabled '%s' (%s)", cfg.Name, cfg.Source)
        return
    }

    cfg.Source = fmt.Sprintf("file=%s", conf.source)
    cfg.ConfigDefaults = d.configDefaults

    pl, err := d.newPipeline(cfg)
    if err != nil {
        d.Error(err)
        return
    }

    if stop, ok := d.pipelines[conf.source]; ok {
        stop()
    }

    var wg sync.WaitGroup
    plCtx, cancel := context.WithCancel(ctx)

    wg.Add(1)
    go func() { defer wg.Done(); pl.Run(plCtx, in) }()

    stop := func() { cancel(); wg.Wait() }
    d.pipelines[conf.source] = stop
}

func (d *ServiceDiscovery) cleanup() {
    for _, stop := range d.pipelines {
        stop()
    }
}