src/go/collectors/go.d.plugin/agent/discovery/manager.go
// SPDX-License-Identifier: GPL-3.0-or-later
package discovery
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
"github.com/netdata/netdata/go/go.d.plugin/agent/discovery/dummy"
"github.com/netdata/netdata/go/go.d.plugin/agent/discovery/file"
"github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd"
"github.com/netdata/netdata/go/go.d.plugin/logger"
)
func NewManager(cfg Config) (*Manager, error) {
if err := validateConfig(cfg); err != nil {
return nil, fmt.Errorf("discovery manager config validation: %v", err)
}
mgr := &Manager{
Logger: logger.New().With(
slog.String("component", "discovery manager"),
),
send: make(chan struct{}, 1),
sendEvery: time.Second * 2, // timeout to aggregate changes
discoverers: make([]discoverer, 0),
mux: &sync.RWMutex{},
cache: newCache(),
}
if err := mgr.registerDiscoverers(cfg); err != nil {
return nil, fmt.Errorf("discovery manager initializaion: %v", err)
}
return mgr, nil
}
type discoverer interface {
Run(ctx context.Context, in chan<- []*confgroup.Group)
}
type Manager struct {
*logger.Logger
discoverers []discoverer
send chan struct{}
sendEvery time.Duration
mux *sync.RWMutex
cache *cache
}
func (m *Manager) String() string {
return fmt.Sprintf("discovery manager: %v", m.discoverers)
}
func (m *Manager) Run(ctx context.Context, in chan<- []*confgroup.Group) {
m.Info("instance is started")
defer func() { m.Info("instance is stopped") }()
var wg sync.WaitGroup
for _, d := range m.discoverers {
wg.Add(1)
go func(d discoverer) {
defer wg.Done()
m.runDiscoverer(ctx, d)
}(d)
}
wg.Add(1)
go func() {
defer wg.Done()
m.sendLoop(ctx, in)
}()
wg.Wait()
<-ctx.Done()
}
func (m *Manager) registerDiscoverers(cfg Config) error {
if len(cfg.File.Read) > 0 || len(cfg.File.Watch) > 0 {
cfg.File.Registry = cfg.Registry
d, err := file.NewDiscovery(cfg.File)
if err != nil {
return err
}
m.discoverers = append(m.discoverers, d)
}
if len(cfg.Dummy.Names) > 0 {
cfg.Dummy.Registry = cfg.Registry
d, err := dummy.NewDiscovery(cfg.Dummy)
if err != nil {
return err
}
m.discoverers = append(m.discoverers, d)
}
if len(cfg.SD.ConfDir) != 0 {
cfg.SD.ConfigDefaults = cfg.Registry
d, err := sd.NewServiceDiscovery(cfg.SD)
if err != nil {
return err
}
m.discoverers = append(m.discoverers, d)
}
if len(m.discoverers) == 0 {
return errors.New("zero registered discoverers")
}
m.Infof("registered discoverers: %v", m.discoverers)
return nil
}
func (m *Manager) runDiscoverer(ctx context.Context, d discoverer) {
updates := make(chan []*confgroup.Group)
go d.Run(ctx, updates)
for {
select {
case <-ctx.Done():
return
case groups, ok := <-updates:
if !ok {
return
}
func() {
m.mux.Lock()
defer m.mux.Unlock()
m.cache.update(groups)
m.triggerSend()
}()
}
}
}
func (m *Manager) sendLoop(ctx context.Context, in chan<- []*confgroup.Group) {
m.mustSend(ctx, in)
tk := time.NewTicker(m.sendEvery)
defer tk.Stop()
for {
select {
case <-ctx.Done():
return
case <-tk.C:
select {
case <-m.send:
m.trySend(in)
default:
}
}
}
}
func (m *Manager) mustSend(ctx context.Context, in chan<- []*confgroup.Group) {
select {
case <-ctx.Done():
return
case <-m.send:
m.mux.Lock()
groups := m.cache.groups()
m.cache.reset()
m.mux.Unlock()
select {
case <-ctx.Done():
case in <- groups:
}
return
}
}
func (m *Manager) trySend(in chan<- []*confgroup.Group) {
m.mux.Lock()
defer m.mux.Unlock()
select {
case in <- m.cache.groups():
m.cache.reset()
default:
m.triggerSend()
}
}
func (m *Manager) triggerSend() {
select {
case m.send <- struct{}{}:
default:
}
}