netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/jobmgr/manager.go

Summary

Maintainability
A
0 mins
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package jobmgr

import (
    "context"
    "fmt"
    "io"
    "log/slog"
    "os"
    "sync"
    "time"

    "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
    "github.com/netdata/netdata/go/go.d.plugin/agent/functions"
    "github.com/netdata/netdata/go/go.d.plugin/agent/module"
    "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
    "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
    "github.com/netdata/netdata/go/go.d.plugin/agent/ticker"
    "github.com/netdata/netdata/go/go.d.plugin/logger"

    "github.com/mattn/go-isatty"
    "gopkg.in/yaml.v2"
)

var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd())

func New() *Manager {
    mgr := &Manager{
        Logger: logger.New().With(
            slog.String("component", "job manager"),
        ),
        Out:             io.Discard,
        FileLock:        noop{},
        FileStatus:      noop{},
        FileStatusStore: noop{},
        Vnodes:          noop{},
        FnReg:           noop{},

        discoveredConfigs: newDiscoveredConfigsCache(),
        seenConfigs:       newSeenConfigCache(),
        exposedConfigs:    newExposedConfigCache(),
        runningJobs:       newRunningJobsCache(),
        retryingTasks:     newRetryingTasksCache(),

        started:  make(chan struct{}),
        api:      netdataapi.New(safewriter.Stdout),
        addCh:    make(chan confgroup.Config),
        rmCh:     make(chan confgroup.Config),
        dyncfgCh: make(chan functions.Function),
    }

    return mgr
}

type Manager struct {
    *logger.Logger

    PluginName     string
    Out            io.Writer
    Modules        module.Registry
    ConfigDefaults confgroup.Registry

    FileLock        FileLocker
    FileStatus      FileStatus
    FileStatusStore FileStatusStore
    Vnodes          Vnodes
    FnReg           FunctionRegistry

    discoveredConfigs *discoveredConfigs
    seenConfigs       *seenConfigs
    exposedConfigs    *exposedConfigs
    retryingTasks     *retryingTasks
    runningJobs       *runningJobs

    ctx      context.Context
    started  chan struct{}
    api      dyncfgAPI
    addCh    chan confgroup.Config
    rmCh     chan confgroup.Config
    dyncfgCh chan functions.Function

    waitCfgOnOff string // block processing of discovered configs until "enable"/"disable" is received from Netdata
}

func (m *Manager) Run(ctx context.Context, in chan []*confgroup.Group) {
    m.Info("instance is started")
    defer func() { m.cleanup(); m.Info("instance is stopped") }()
    m.ctx = ctx

    m.FnReg.Register("config", m.dyncfgConfig)

    for name := range m.Modules {
        m.dyncfgModuleCreate(name)
    }

    var wg sync.WaitGroup

    wg.Add(1)
    go func() { defer wg.Done(); m.runProcessConfGroups(in) }()

    wg.Add(1)
    go func() { defer wg.Done(); m.run() }()

    wg.Add(1)
    go func() { defer wg.Done(); m.runNotifyRunningJobs() }()

    close(m.started)

    wg.Wait()
    <-m.ctx.Done()
}

func (m *Manager) runProcessConfGroups(in chan []*confgroup.Group) {
    for {
        select {
        case <-m.ctx.Done():
            return
        case groups := <-in:
            for _, gr := range groups {
                a, r := m.discoveredConfigs.add(gr)
                m.Debugf("received configs: %d/+%d/-%d ('%s')", len(gr.Configs), len(a), len(r), gr.Source)
                sendConfigs(m.ctx, m.rmCh, r...)
                sendConfigs(m.ctx, m.addCh, a...)
            }
        }
    }
}

func (m *Manager) run() {
    for {
        if m.waitCfgOnOff != "" {
            select {
            case <-m.ctx.Done():
                return
            case fn := <-m.dyncfgCh:
                m.dyncfgConfigExec(fn)
            }
        } else {
            select {
            case <-m.ctx.Done():
                return
            case cfg := <-m.addCh:
                m.addConfig(cfg)
            case cfg := <-m.rmCh:
                m.removeConfig(cfg)
            case fn := <-m.dyncfgCh:
                m.dyncfgConfigExec(fn)
            }
        }
    }
}

func (m *Manager) addConfig(cfg confgroup.Config) {
    if _, ok := m.Modules.Lookup(cfg.Module()); !ok {
        return
    }

    m.retryingTasks.remove(cfg)

    scfg, ok := m.seenConfigs.lookup(cfg)
    if !ok {
        scfg = &seenConfig{cfg: cfg}
        m.seenConfigs.add(scfg)
    }

    ecfg, ok := m.exposedConfigs.lookup(cfg)
    if !ok {
        scfg.status = dyncfgAccepted
        ecfg = scfg
        m.exposedConfigs.add(ecfg)
    } else {
        sp, ep := scfg.cfg.SourceTypePriority(), ecfg.cfg.SourceTypePriority()
        if ep > sp || (ep == sp && ecfg.status == dyncfgRunning) {
            m.retryingTasks.remove(cfg)
            return
        }
        if ecfg.status == dyncfgRunning {
            m.stopRunningJob(ecfg.cfg.FullName())
            m.FileLock.Unlock(ecfg.cfg.FullName())
            m.FileStatus.Remove(ecfg.cfg)
        }
        scfg.status = dyncfgAccepted
        m.exposedConfigs.add(scfg) // replace existing exposed
        ecfg = scfg
    }

    m.dyncfgJobCreate(ecfg.cfg, ecfg.status)

    if isTerminal || m.PluginName == "nodyncfg" { // FIXME: quick fix of TestAgent_Run (agent_test.go)
        m.dyncfgConfigEnable(functions.Function{Args: []string{dyncfgJobID(ecfg.cfg), "enable"}})
    } else {
        m.waitCfgOnOff = ecfg.cfg.FullName()
    }
}

func (m *Manager) removeConfig(cfg confgroup.Config) {
    m.retryingTasks.remove(cfg)

    scfg, ok := m.seenConfigs.lookup(cfg)
    if !ok {
        return
    }
    m.seenConfigs.remove(cfg)

    ecfg, ok := m.exposedConfigs.lookup(cfg)
    if !ok || scfg.cfg.UID() != ecfg.cfg.UID() {
        return
    }

    m.exposedConfigs.remove(cfg)
    m.stopRunningJob(cfg.FullName())
    m.FileLock.Unlock(cfg.FullName())
    m.FileStatus.Remove(cfg)

    if !isStock(cfg) || ecfg.status == dyncfgRunning {
        m.dyncfgJobRemove(cfg)
    }
}

func (m *Manager) runNotifyRunningJobs() {
    tk := ticker.New(time.Second)
    defer tk.Stop()

    for {
        select {
        case <-m.ctx.Done():
            return
        case clock := <-tk.C:
            m.runningJobs.lock()
            m.runningJobs.forEach(func(_ string, job *module.Job) { job.Tick(clock) })
            m.runningJobs.unlock()
        }
    }
}

func (m *Manager) startRunningJob(job *module.Job) {
    m.runningJobs.lock()
    defer m.runningJobs.unlock()

    if job, ok := m.runningJobs.lookup(job.FullName()); ok {
        job.Stop()
    }

    go job.Start()
    m.runningJobs.add(job.FullName(), job)
}

func (m *Manager) stopRunningJob(name string) {
    m.runningJobs.lock()
    defer m.runningJobs.unlock()

    if job, ok := m.runningJobs.lookup(name); ok {
        job.Stop()
        m.runningJobs.remove(name)
    }
}

func (m *Manager) cleanup() {
    m.FnReg.Unregister("config")

    m.runningJobs.lock()
    defer m.runningJobs.unlock()

    m.runningJobs.forEach(func(key string, job *module.Job) {
        job.Stop()
    })
}

func (m *Manager) createCollectorJob(cfg confgroup.Config) (*module.Job, error) {
    creator, ok := m.Modules[cfg.Module()]
    if !ok {
        return nil, fmt.Errorf("can not find %s module", cfg.Module())
    }

    var vnode struct {
        guid     string
        hostname string
        labels   map[string]string
    }

    if cfg.Vnode() != "" {
        n, ok := m.Vnodes.Lookup(cfg.Vnode())
        if !ok {
            return nil, fmt.Errorf("vnode '%s' is not found", cfg.Vnode())
        }

        vnode.guid = n.GUID
        vnode.hostname = n.Hostname
        vnode.labels = n.Labels
    }

    m.Debugf("creating %s[%s] job, config: %v", cfg.Module(), cfg.Name(), cfg)

    mod := creator.Create()

    if err := applyConfig(cfg, mod); err != nil {
        return nil, err
    }

    jobCfg := module.JobConfig{
        PluginName:      m.PluginName,
        Name:            cfg.Name(),
        ModuleName:      cfg.Module(),
        FullName:        cfg.FullName(),
        UpdateEvery:     cfg.UpdateEvery(),
        AutoDetectEvery: cfg.AutoDetectionRetry(),
        Priority:        cfg.Priority(),
        Labels:          makeLabels(cfg),
        IsStock:         cfg.SourceType() == "stock",
        Module:          mod,
        Out:             m.Out,
        VnodeGUID:       vnode.guid,
        VnodeHostname:   vnode.hostname,
        VnodeLabels:     vnode.labels,
    }

    job := module.NewJob(jobCfg)

    return job, nil
}

func runRetryTask(ctx context.Context, out chan<- confgroup.Config, cfg confgroup.Config) {
    t := time.NewTimer(time.Second * time.Duration(cfg.AutoDetectionRetry()))
    defer t.Stop()

    select {
    case <-ctx.Done():
    case <-t.C:
        sendConfigs(ctx, out, cfg)
    }
}

func sendConfigs(ctx context.Context, out chan<- confgroup.Config, cfgs ...confgroup.Config) {
    for _, cfg := range cfgs {
        select {
        case <-ctx.Done():
            return
        case out <- cfg:
        }
    }
}

func isStock(cfg confgroup.Config) bool {
    return cfg.SourceType() == confgroup.TypeStock
}

func isDyncfg(cfg confgroup.Config) bool {
    return cfg.SourceType() == confgroup.TypeDyncfg
}

func applyConfig(cfg confgroup.Config, module any) error {
    bs, err := yaml.Marshal(cfg)
    if err != nil {
        return err
    }
    return yaml.Unmarshal(bs, module)
}

func makeLabels(cfg confgroup.Config) map[string]string {
    labels := make(map[string]string)
    for name, value := range cfg.Labels() {
        n, ok1 := name.(string)
        v, ok2 := value.(string)
        if ok1 && ok2 {
            labels[n] = v
        }
    }
    return labels
}