src/go/collectors/go.d.plugin/agent/agent.go
// SPDX-License-Identifier: GPL-3.0-or-later
package agent
import (
"context"
"io"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
"github.com/netdata/netdata/go/go.d.plugin/agent/discovery"
"github.com/netdata/netdata/go/go.d.plugin/agent/filelock"
"github.com/netdata/netdata/go/go.d.plugin/agent/filestatus"
"github.com/netdata/netdata/go/go.d.plugin/agent/functions"
"github.com/netdata/netdata/go/go.d.plugin/agent/jobmgr"
"github.com/netdata/netdata/go/go.d.plugin/agent/module"
"github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
"github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
"github.com/netdata/netdata/go/go.d.plugin/agent/vnodes"
"github.com/netdata/netdata/go/go.d.plugin/logger"
"github.com/netdata/netdata/go/go.d.plugin/pkg/multipath"
"github.com/mattn/go-isatty"
)
var isTerminal = isatty.IsTerminal(os.Stdout.Fd())
// Config is an Agent configuration.
type Config struct {
Name string
ConfDir []string
ModulesConfDir []string
ModulesConfSDDir []string
ModulesConfWatchPath []string
VnodesConfDir []string
StateFile string
LockDir string
ModuleRegistry module.Registry
RunModule string
MinUpdateEvery int
}
// Agent represents orchestrator.
type Agent struct {
*logger.Logger
Name string
ConfDir multipath.MultiPath
ModulesConfDir multipath.MultiPath
ModulesConfSDDir multipath.MultiPath
ModulesSDConfPath []string
VnodesConfDir multipath.MultiPath
StateFile string
LockDir string
RunModule string
MinUpdateEvery int
ModuleRegistry module.Registry
Out io.Writer
api *netdataapi.API
}
// New creates a new Agent.
func New(cfg Config) *Agent {
return &Agent{
Logger: logger.New().With(
slog.String("component", "agent"),
),
Name: cfg.Name,
ConfDir: cfg.ConfDir,
ModulesConfDir: cfg.ModulesConfDir,
ModulesConfSDDir: cfg.ModulesConfSDDir,
ModulesSDConfPath: cfg.ModulesConfWatchPath,
VnodesConfDir: cfg.VnodesConfDir,
StateFile: cfg.StateFile,
LockDir: cfg.LockDir,
RunModule: cfg.RunModule,
MinUpdateEvery: cfg.MinUpdateEvery,
ModuleRegistry: module.DefaultRegistry,
Out: safewriter.Stdout,
api: netdataapi.New(safewriter.Stdout),
}
}
// Run starts the Agent.
func (a *Agent) Run() {
go a.keepAlive()
serve(a)
}
func serve(a *Agent) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
var wg sync.WaitGroup
var exit bool
for {
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() { defer wg.Done(); a.run(ctx) }()
switch sig := <-ch; sig {
case syscall.SIGHUP:
a.Infof("received %s signal (%d). Restarting running instance", sig, sig)
default:
a.Infof("received %s signal (%d). Terminating...", sig, sig)
module.DontObsoleteCharts()
exit = true
}
cancel()
func() {
timeout := time.Second * 10
t := time.NewTimer(timeout)
defer t.Stop()
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-t.C:
a.Errorf("stopping all goroutines timed out after %s. Exiting...", timeout)
os.Exit(0)
case <-done:
}
}()
if exit {
os.Exit(0)
}
time.Sleep(time.Second)
}
}
func (a *Agent) run(ctx context.Context) {
a.Info("instance is started")
defer func() { a.Info("instance is stopped") }()
cfg := a.loadPluginConfig()
a.Infof("using config: %s", cfg.String())
if !cfg.Enabled {
a.Info("plugin is disabled in the configuration file, exiting...")
if isTerminal {
os.Exit(0)
}
_ = a.api.DISABLE()
return
}
enabledModules := a.loadEnabledModules(cfg)
if len(enabledModules) == 0 {
a.Info("no modules to run")
if isTerminal {
os.Exit(0)
}
_ = a.api.DISABLE()
return
}
discCfg := a.buildDiscoveryConf(enabledModules)
discMgr, err := discovery.NewManager(discCfg)
if err != nil {
a.Error(err)
if isTerminal {
os.Exit(0)
}
return
}
fnMgr := functions.NewManager()
jobMgr := jobmgr.New()
jobMgr.PluginName = a.Name
jobMgr.Out = a.Out
jobMgr.Modules = enabledModules
jobMgr.ConfigDefaults = discCfg.Registry
jobMgr.FnReg = fnMgr
if reg := a.setupVnodeRegistry(); reg == nil || reg.Len() == 0 {
vnodes.Disabled = true
} else {
jobMgr.Vnodes = reg
}
if a.LockDir != "" {
jobMgr.FileLock = filelock.New(a.LockDir)
}
var fsMgr *filestatus.Manager
if !isTerminal && a.StateFile != "" {
fsMgr = filestatus.NewManager(a.StateFile)
jobMgr.FileStatus = fsMgr
if store, err := filestatus.LoadStore(a.StateFile); err != nil {
a.Warningf("couldn't load state file: %v", err)
} else {
jobMgr.FileStatusStore = store
}
}
in := make(chan []*confgroup.Group)
var wg sync.WaitGroup
wg.Add(1)
go func() { defer wg.Done(); fnMgr.Run(ctx) }()
wg.Add(1)
go func() { defer wg.Done(); jobMgr.Run(ctx, in) }()
wg.Add(1)
go func() { defer wg.Done(); discMgr.Run(ctx, in) }()
if fsMgr != nil {
wg.Add(1)
go func() { defer wg.Done(); fsMgr.Run(ctx) }()
}
wg.Wait()
<-ctx.Done()
}
func (a *Agent) keepAlive() {
if isTerminal {
return
}
tk := time.NewTicker(time.Second)
defer tk.Stop()
var n int
for range tk.C {
if err := a.api.EMPTYLINE(); err != nil {
a.Infof("keepAlive: %v", err)
n++
} else {
n = 0
}
if n == 3 {
a.Info("too many keepAlive errors. Terminating...")
os.Exit(0)
}
}
}