src/go/plugin/go.d/agent/jobmgr/dyncfg.go
// SPDX-License-Identifier: GPL-3.0-or-later
package jobmgr
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"reflect"
"strconv"
"strings"
"time"
"unicode"
"github.com/netdata/netdata/go/plugins/logger"
"github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
"github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions"
"gopkg.in/yaml.v2"
)
type dyncfgStatus int
const (
_ dyncfgStatus = iota
dyncfgAccepted
dyncfgRunning
dyncfgFailed
dyncfgIncomplete
dyncfgDisabled
)
func (s dyncfgStatus) String() string {
switch s {
case dyncfgAccepted:
return "accepted"
case dyncfgRunning:
return "running"
case dyncfgFailed:
return "failed"
case dyncfgIncomplete:
return "incomplete"
case dyncfgDisabled:
return "disabled"
default:
return "unknown"
}
}
const (
dyncfgIDPrefix = "go.d:collector:"
dyncfgPath = "/collectors/jobs"
)
func dyncfgModID(name string) string {
return fmt.Sprintf("%s%s", dyncfgIDPrefix, name)
}
func dyncfgJobID(cfg confgroup.Config) string {
return fmt.Sprintf("%s%s:%s", dyncfgIDPrefix, cfg.Module(), cfg.Name())
}
func dyncfgModCmds() string {
return "add schema enable disable test userconfig"
}
func dyncfgJobCmds(cfg confgroup.Config) string {
cmds := "schema get enable disable update restart test userconfig"
if isDyncfg(cfg) {
cmds += " remove"
}
return cmds
}
func (m *Manager) dyncfgModuleCreate(name string) {
id := dyncfgModID(name)
path := dyncfgPath
cmds := dyncfgModCmds()
typ := "template"
src := "internal"
m.api.CONFIGCREATE(id, dyncfgAccepted.String(), typ, path, src, src, cmds)
}
func (m *Manager) dyncfgJobCreate(cfg confgroup.Config, status dyncfgStatus) {
id := dyncfgJobID(cfg)
path := dyncfgPath
cmds := dyncfgJobCmds(cfg)
typ := "job"
m.api.CONFIGCREATE(id, status.String(), typ, path, cfg.SourceType(), cfg.Source(), cmds)
}
func (m *Manager) dyncfgJobRemove(cfg confgroup.Config) {
m.api.CONFIGDELETE(dyncfgJobID(cfg))
}
func (m *Manager) dyncfgJobStatus(cfg confgroup.Config, status dyncfgStatus) {
m.api.CONFIGSTATUS(dyncfgJobID(cfg), status.String())
}
func (m *Manager) dyncfgConfig(fn functions.Function) {
if len(fn.Args) < 2 {
m.Warningf("dyncfg: %s: missing required arguments, want 3 got %d", fn.Name, len(fn.Args))
m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 2, but got %d.", len(fn.Args))
return
}
select {
case <-m.ctx.Done():
m.dyncfgRespf(fn, 503, "Job manager is shutting down.")
default:
}
//m.Infof("QQ FN: '%s'", fn)
action := strings.ToLower(fn.Args[1])
switch action {
case "userconfig":
m.dyncfgConfigUserconfig(fn)
return
case "test":
m.dyncfgConfigTest(fn)
return
case "schema":
m.dyncfgConfigSchema(fn)
return
}
select {
case <-m.ctx.Done():
m.dyncfgRespf(fn, 503, "Job manager is shutting down.")
case m.dyncfgCh <- fn:
}
}
func (m *Manager) dyncfgConfigExec(fn functions.Function) {
action := strings.ToLower(fn.Args[1])
switch action {
case "test":
m.dyncfgConfigTest(fn)
case "schema":
m.dyncfgConfigSchema(fn)
case "get":
m.dyncfgConfigGet(fn)
case "restart":
m.dyncfgConfigRestart(fn)
case "enable":
m.dyncfgConfigEnable(fn)
case "disable":
m.dyncfgConfigDisable(fn)
case "add":
m.dyncfgConfigAdd(fn)
case "remove":
m.dyncfgConfigRemove(fn)
case "update":
m.dyncfgConfigUpdate(fn)
default:
m.Warningf("dyncfg: function '%s' not implemented", fn.String())
m.dyncfgRespf(fn, 501, "Function '%s' is not implemented.", fn.Name)
}
}
func (m *Manager) dyncfgConfigUserconfig(fn functions.Function) {
id := fn.Args[0]
jn := "test"
if len(fn.Args) > 2 {
jn = fn.Args[2]
}
mn, ok := extractModuleName(id)
if !ok {
m.Warningf("dyncfg: userconfig: could not extract module and job from id (%s)", id)
m.dyncfgRespf(fn, 400,
"Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
return
}
creator, ok := m.Modules.Lookup(mn)
if !ok {
m.Warningf("dyncfg: userconfig: module %s not found", mn)
m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
return
}
if creator.Config == nil || creator.Config() == nil {
m.Warningf("dyncfg: userconfig: module %s: configuration not found", mn)
m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn)
return
}
bs, err := userConfigFromPayload(creator.Config(), jn, fn)
if err != nil {
m.Warningf("dyncfg: userconfig: module %s: failed to create config from payload: %v", mn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
}
m.dyncfgRespPayloadYAML(fn, string(bs))
}
func (m *Manager) dyncfgConfigTest(fn functions.Function) {
id := fn.Args[0]
mn, ok := extractModuleName(id)
if !ok {
m.Warningf("dyncfg: test: could not extract module and job from id (%s)", id)
m.dyncfgRespf(fn, 400,
"Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
return
}
jn := "test"
if len(fn.Args) > 2 {
jn = fn.Args[2]
}
if err := validateJobName(jn); err != nil {
m.Warningf("dyncfg: test: module %s: unacceptable job name '%s': %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Unacceptable job name '%s': %v.", jn, err)
return
}
creator, ok := m.Modules.Lookup(mn)
if !ok {
m.Warningf("dyncfg: test: module %s not found", mn)
m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
return
}
cfg, err := configFromPayload(fn)
if err != nil {
m.Warningf("dyncfg: test: module %s: failed to create config from payload: %v", mn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
return
}
if cfg.Vnode() != "" {
if _, ok := m.Vnodes.Lookup(cfg.Vnode()); !ok {
m.Warningf("dyncfg: test: module %s: vnode %s not found", mn, cfg.Vnode())
m.dyncfgRespf(fn, 400, "The specified vnode '%s' is not registered.", cfg.Vnode())
return
}
}
cfg.SetModule(mn)
cfg.SetName(jn)
job := creator.Create()
if err := applyConfig(cfg, job); err != nil {
m.Warningf("dyncfg: test: module %s: failed to apply config: %v", mn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
return
}
job.GetBase().Logger = logger.New().With(
slog.String("collector", cfg.Module()),
slog.String("job", cfg.Name()),
)
defer job.Cleanup()
if err := job.Init(); err != nil {
m.dyncfgRespf(fn, 422, "Job initialization failed: %v", err)
return
}
if err := job.Check(); err != nil {
m.dyncfgRespf(fn, 422, "Job check failed: %v", err)
return
}
m.dyncfgRespf(fn, 200, "")
}
func (m *Manager) dyncfgConfigSchema(fn functions.Function) {
id := fn.Args[0]
mn, ok := extractModuleName(id)
if !ok {
m.Warningf("dyncfg: schema: could not extract module from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
return
}
mod, ok := m.Modules.Lookup(mn)
if !ok {
m.Warningf("dyncfg: schema: module %s not found", mn)
m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
return
}
if mod.JobConfigSchema == "" {
m.Warningf("dyncfg: schema: module %s: schema not found", mn)
m.dyncfgRespf(fn, 500, "Module %s configuration schema not found.", mn)
return
}
m.dyncfgRespPayloadJSON(fn, mod.JobConfigSchema)
}
func (m *Manager) dyncfgConfigGet(fn functions.Function) {
id := fn.Args[0]
mn, jn, ok := extractModuleJobName(id)
if !ok {
m.Warningf("dyncfg: get: could not extract module and job from id (%s)", id)
m.dyncfgRespf(fn, 400,
"Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
return
}
creator, ok := m.Modules.Lookup(mn)
if !ok {
m.Warningf("dyncfg: get: module %s not found", mn)
m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
return
}
ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
if !ok {
m.Warningf("dyncfg: get: module %s job %s not found", mn, jn)
m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
return
}
mod := creator.Create()
if err := applyConfig(ecfg.cfg, mod); err != nil {
m.Warningf("dyncfg: get: module %s job %s failed to apply config: %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
return
}
conf := mod.Configuration()
if conf == nil {
m.Warningf("dyncfg: get: module %s: configuration not found", mn)
m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn)
return
}
bs, err := json.Marshal(conf)
if err != nil {
m.Warningf("dyncfg: get: module %s job %s failed to json marshal config: %v", mn, jn, err)
m.dyncfgRespf(fn, 500, "Failed to convert configuration into JSON: %v.", err)
return
}
m.dyncfgRespPayloadJSON(fn, string(bs))
}
func (m *Manager) dyncfgConfigRestart(fn functions.Function) {
id := fn.Args[0]
mn, jn, ok := extractModuleJobName(id)
if !ok {
m.Warningf("dyncfg: restart: could not extract module from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
return
}
ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
if !ok {
m.Warningf("dyncfg: restart: module %s job %s not found", mn, jn)
m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
return
}
job, err := m.createCollectorJob(ecfg.cfg)
if err != nil {
m.Warningf("dyncfg: restart: module %s job %s: failed to apply config: %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
switch ecfg.status {
case dyncfgAccepted, dyncfgDisabled:
m.Warningf("dyncfg: restart: module %s job %s: restarting not allowed in '%s' state", mn, jn, ecfg.status)
m.dyncfgRespf(fn, 405, "Restarting data collection job is not allowed in '%s' state.", ecfg.status)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
case dyncfgRunning:
m.FileStatus.Remove(ecfg.cfg)
m.FileLock.Unlock(ecfg.cfg.FullName())
m.stopRunningJob(ecfg.cfg.FullName())
default:
}
if err := job.AutoDetection(); err != nil {
job.Cleanup()
ecfg.status = dyncfgFailed
m.dyncfgRespf(fn, 422, "Job restart failed: %v", err)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil {
job.Cleanup()
ecfg.status = dyncfgFailed
m.dyncfgRespf(fn, 500, "Job restart failed: cannot filelock.")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
ecfg.status = dyncfgRunning
if isDyncfg(ecfg.cfg) {
m.FileStatus.Save(ecfg.cfg, ecfg.status.String())
}
m.startRunningJob(job)
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
}
func (m *Manager) dyncfgConfigEnable(fn functions.Function) {
id := fn.Args[0]
mn, jn, ok := extractModuleJobName(id)
if !ok {
m.Warningf("dyncfg: enable: could not extract module and job from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
return
}
ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
if !ok {
m.Warningf("dyncfg: enable: module %s job %s not found", mn, jn)
m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
return
}
if ecfg.cfg.FullName() == m.waitCfgOnOff {
m.waitCfgOnOff = ""
}
switch ecfg.status {
case dyncfgAccepted, dyncfgDisabled, dyncfgFailed:
case dyncfgRunning:
// non-dyncfg update triggers enable/disable
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
default:
m.Warningf("dyncfg: enable: module %s job %s: enabling not allowed in %s state", mn, jn, ecfg.status)
m.dyncfgRespf(fn, 405, "Enabling data collection job is not allowed in '%s' state.", ecfg.status)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
job, err := m.createCollectorJob(ecfg.cfg)
if err != nil {
ecfg.status = dyncfgFailed
m.Warningf("dyncfg: enable: module %s job %s: failed to apply config: %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
if err := job.AutoDetection(); err != nil {
job.Cleanup()
ecfg.status = dyncfgFailed
m.dyncfgRespf(fn, 200, "Job enable failed: %v.", err)
if isStock(ecfg.cfg) {
m.exposedConfigs.remove(ecfg.cfg)
m.dyncfgJobRemove(ecfg.cfg)
} else {
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
}
if job.RetryAutoDetection() && !isDyncfg(ecfg.cfg) {
m.Infof("%s[%s] job detection failed, will retry in %d seconds",
ecfg.cfg.Module(), ecfg.cfg.Name(), job.AutoDetectionEvery())
ctx, cancel := context.WithCancel(m.ctx)
m.retryingTasks.add(ecfg.cfg, &retryTask{cancel: cancel})
go runRetryTask(ctx, m.addCh, ecfg.cfg)
}
return
}
if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil {
job.Cleanup()
ecfg.status = dyncfgFailed
m.dyncfgRespf(fn, 500, "Job enable failed: can not filelock.")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
ecfg.status = dyncfgRunning
if isDyncfg(ecfg.cfg) {
m.FileStatus.Save(ecfg.cfg, ecfg.status.String())
}
m.startRunningJob(job)
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
}
func (m *Manager) dyncfgConfigDisable(fn functions.Function) {
id := fn.Args[0]
mn, jn, ok := extractModuleJobName(id)
if !ok {
m.Warningf("dyncfg: disable: could not extract module from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
return
}
ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
if !ok {
m.Warningf("dyncfg: disable: module %s job %s not found", mn, jn)
m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
return
}
if ecfg.cfg.FullName() == m.waitCfgOnOff {
m.waitCfgOnOff = ""
}
switch ecfg.status {
case dyncfgDisabled:
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
case dyncfgRunning:
m.stopRunningJob(ecfg.cfg.FullName())
if isDyncfg(ecfg.cfg) {
m.FileStatus.Remove(ecfg.cfg)
}
m.FileLock.Unlock(ecfg.cfg.FullName())
default:
}
ecfg.status = dyncfgDisabled
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
}
func (m *Manager) dyncfgConfigAdd(fn functions.Function) {
if len(fn.Args) < 3 {
m.Warningf("dyncfg: add: missing required arguments, want 3 got %d", len(fn.Args))
m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 3, but got %d.", len(fn.Args))
return
}
id := fn.Args[0]
jn := fn.Args[2]
mn, ok := extractModuleName(id)
if !ok {
m.Warningf("dyncfg: add: could not extract module from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
return
}
if len(fn.Payload) == 0 {
m.Warningf("dyncfg: add: module %s job %s missing configuration payload.", mn, jn)
m.dyncfgRespf(fn, 400, "Missing configuration payload.")
return
}
if err := validateJobName(jn); err != nil {
m.Warningf("dyncfg: add: module %s: unacceptable job name '%s': %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Unacceptable job name '%s': %v.", jn, err)
return
}
cfg, err := configFromPayload(fn)
if err != nil {
m.Warningf("dyncfg: add: module %s job %s: failed to create config from payload: %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
return
}
m.dyncfgSetConfigMeta(cfg, mn, jn)
if _, err := m.createCollectorJob(cfg); err != nil {
m.Warningf("dyncfg: add: module %s job %s: failed to apply config: %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
return
}
if ecfg, ok := m.exposedConfigs.lookup(cfg); ok {
if scfg, ok := m.seenConfigs.lookup(ecfg.cfg); ok && isDyncfg(scfg.cfg) {
m.seenConfigs.remove(ecfg.cfg)
}
m.exposedConfigs.remove(ecfg.cfg)
m.stopRunningJob(ecfg.cfg.FullName())
}
scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted}
ecfg := scfg
m.seenConfigs.add(scfg)
m.exposedConfigs.add(ecfg)
m.dyncfgRespf(fn, 202, "")
m.dyncfgJobCreate(ecfg.cfg, ecfg.status)
}
func (m *Manager) dyncfgConfigRemove(fn functions.Function) {
id := fn.Args[0]
mn, jn, ok := extractModuleJobName(id)
if !ok {
m.Warningf("dyncfg: remove: could not extract module and job from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
return
}
ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
if !ok {
m.Warningf("dyncfg: remove: module %s job %s not found", mn, jn)
m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
return
}
if !isDyncfg(ecfg.cfg) {
m.Warningf("dyncfg: remove: module %s job %s: can not remove jobs of type %s", mn, jn, ecfg.cfg.SourceType())
m.dyncfgRespf(fn, 405, "Removing jobs of type '%s' is not supported. Only 'dyncfg' jobs can be removed.", ecfg.cfg.SourceType())
return
}
m.seenConfigs.remove(ecfg.cfg)
m.exposedConfigs.remove(ecfg.cfg)
m.stopRunningJob(ecfg.cfg.FullName())
m.FileLock.Unlock(ecfg.cfg.FullName())
m.FileStatus.Remove(ecfg.cfg)
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobRemove(ecfg.cfg)
}
func (m *Manager) dyncfgConfigUpdate(fn functions.Function) {
id := fn.Args[0]
mn, jn, ok := extractModuleJobName(id)
if !ok {
m.Warningf("dyncfg: update: could not extract module from id (%s)", id)
m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
return
}
ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
if !ok {
m.Warningf("dyncfg: update: module %s job %s not found", mn, jn)
m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
return
}
cfg, err := configFromPayload(fn)
if err != nil {
m.Warningf("dyncfg: update: module %s: failed to create config from payload: %v", mn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
m.dyncfgSetConfigMeta(cfg, mn, jn)
if ecfg.status == dyncfgRunning && ecfg.cfg.UID() == cfg.UID() {
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
job, err := m.createCollectorJob(cfg)
if err != nil {
m.Warningf("dyncfg: update: module %s job %s: failed to apply config: %v", mn, jn, err)
m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
if ecfg.status == dyncfgAccepted {
m.Warningf("dyncfg: update: module %s job %s: updating not allowed in %s", mn, jn, ecfg.status)
m.dyncfgRespf(fn, 403, "Updating data collection job is not allowed in '%s' state.", ecfg.status)
m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
return
}
m.exposedConfigs.remove(ecfg.cfg)
m.stopRunningJob(ecfg.cfg.FullName())
scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted}
m.seenConfigs.add(scfg)
m.exposedConfigs.add(scfg)
if isDyncfg(ecfg.cfg) {
m.seenConfigs.remove(ecfg.cfg)
} else {
// needed to update meta. There is no other way, unfortunately, but to send "create"
defer m.dyncfgJobCreate(scfg.cfg, scfg.status)
}
if ecfg.status == dyncfgDisabled {
scfg.status = dyncfgDisabled
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(cfg, scfg.status)
return
}
if err := job.AutoDetection(); err != nil {
job.Cleanup()
scfg.status = dyncfgFailed
m.dyncfgRespf(fn, 200, "Job update failed: %v", err)
m.dyncfgJobStatus(scfg.cfg, scfg.status)
return
}
if ok, err := m.FileLock.Lock(scfg.cfg.FullName()); !ok && err == nil {
job.Cleanup()
scfg.status = dyncfgFailed
m.dyncfgRespf(fn, 500, "Job update failed: cannot create file lock.")
m.dyncfgJobStatus(scfg.cfg, scfg.status)
return
}
scfg.status = dyncfgRunning
m.startRunningJob(job)
m.dyncfgRespf(fn, 200, "")
m.dyncfgJobStatus(scfg.cfg, scfg.status)
}
func (m *Manager) dyncfgSetConfigMeta(cfg confgroup.Config, module, name string) {
cfg.SetProvider("dyncfg")
cfg.SetSource(fmt.Sprintf("type=dyncfg,module=%s,job=%s", module, name))
cfg.SetSourceType("dyncfg")
cfg.SetModule(module)
cfg.SetName(name)
if def, ok := m.ConfigDefaults.Lookup(module); ok {
cfg.ApplyDefaults(def)
}
}
func (m *Manager) dyncfgRespPayloadJSON(fn functions.Function, payload string) {
m.dyncfgRespPayload(fn, payload, "application/json")
}
func (m *Manager) dyncfgRespPayloadYAML(fn functions.Function, payload string) {
m.dyncfgRespPayload(fn, payload, "application/yaml")
}
func (m *Manager) dyncfgRespPayload(fn functions.Function, payload string, contentType string) {
ts := strconv.FormatInt(time.Now().Unix(), 10)
m.api.FUNCRESULT(fn.UID, contentType, payload, "200", ts)
}
func (m *Manager) dyncfgRespf(fn functions.Function, code int, msgf string, a ...any) {
if fn.UID == "" {
return
}
bs, _ := json.Marshal(struct {
Status int `json:"status"`
Message string `json:"message"`
}{
Status: code,
Message: fmt.Sprintf(msgf, a...),
})
ts := strconv.FormatInt(time.Now().Unix(), 10)
m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
}
func userConfigFromPayload(cfg any, jobName string, fn functions.Function) ([]byte, error) {
if v := reflect.ValueOf(cfg); v.Kind() != reflect.Ptr || v.IsNil() {
return nil, fmt.Errorf("invalid config: expected a pointer to a struct, got a %s", v.Type())
}
if fn.ContentType == "application/json" {
if err := json.Unmarshal(fn.Payload, cfg); err != nil {
return nil, err
}
} else {
if err := yaml.Unmarshal(fn.Payload, cfg); err != nil {
return nil, err
}
}
bs, err := yaml.Marshal(cfg)
if err != nil {
return nil, err
}
var yms yaml.MapSlice
if err := yaml.Unmarshal(bs, &yms); err != nil {
return nil, err
}
yms = append([]yaml.MapItem{{Key: "name", Value: jobName}}, yms...)
v := map[string]any{
"jobs": []any{yms},
}
bs, err = yaml.Marshal(v)
if err != nil {
return nil, err
}
return bs, nil
}
func configFromPayload(fn functions.Function) (confgroup.Config, error) {
var cfg confgroup.Config
if fn.ContentType == "application/json" {
if err := json.Unmarshal(fn.Payload, &cfg); err != nil {
return nil, err
}
return cfg.Clone()
}
if err := yaml.Unmarshal(fn.Payload, &cfg); err != nil {
return nil, err
}
return cfg, nil
}
func extractModuleJobName(id string) (mn string, jn string, ok bool) {
if mn, ok = extractModuleName(id); !ok {
return "", "", false
}
if jn, ok = extractJobName(id); !ok {
return "", "", false
}
return mn, jn, true
}
func extractModuleName(id string) (string, bool) {
id = strings.TrimPrefix(id, dyncfgIDPrefix)
i := strings.IndexByte(id, ':')
if i == -1 {
return id, id != ""
}
return id[:i], true
}
func extractJobName(id string) (string, bool) {
i := strings.LastIndexByte(id, ':')
if i == -1 {
return "", false
}
return id[i+1:], true
}
func validateJobName(jobName string) error {
for _, r := range jobName {
if unicode.IsSpace(r) {
return errors.New("contains spaces")
}
switch r {
case '.', ':':
return fmt.Errorf("contains '%c'", r)
}
}
return nil
}