netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/functions/manager.go

Summary

Maintainability
A
0 mins
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package functions

import (
    "bufio"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
    "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
    "github.com/netdata/netdata/go/go.d.plugin/logger"

    "github.com/mattn/go-isatty"
    "github.com/muesli/cancelreader"
)

var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd())

func NewManager() *Manager {
    return &Manager{
        Logger: logger.New().With(
            slog.String("component", "functions manager"),
        ),
        Input:            os.Stdin,
        api:              netdataapi.New(safewriter.Stdout),
        mux:              &sync.Mutex{},
        FunctionRegistry: make(map[string]func(Function)),
    }
}

type Manager struct {
    *logger.Logger

    Input            io.Reader
    api              *netdataapi.API
    mux              *sync.Mutex
    FunctionRegistry map[string]func(Function)
}

func (m *Manager) Run(ctx context.Context) {
    m.Info("instance is started")
    defer func() { m.Info("instance is stopped") }()

    if !isTerminal {
        r, err := cancelreader.NewReader(m.Input)
        if err != nil {
            m.Errorf("fail to create cancel reader: %v", err)
            return
        }

        go func() { <-ctx.Done(); r.Cancel() }()

        var wg sync.WaitGroup

        wg.Add(1)
        go func() { defer wg.Done(); m.run(r) }()

        wg.Wait()
        _ = r.Close()
    }

    <-ctx.Done()
}

func (m *Manager) run(r io.Reader) {
    sc := bufio.NewScanner(r)

    for sc.Scan() {
        text := sc.Text()

        var fn *Function
        var err error

        // FIXME:  if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears,
        // we need to discard the current one and switch to the new one
        switch {
        case strings.HasPrefix(text, "FUNCTION "):
            fn, err = parseFunction(text)
        case strings.HasPrefix(text, "FUNCTION_PAYLOAD "):
            fn, err = parseFunctionWithPayload(text, sc)
        case text == "":
            continue
        default:
            m.Warningf("unexpected line: '%s'", text)
            continue
        }

        if err != nil {
            m.Warningf("parse function: %v ('%s')", err, text)
            continue
        }

        function, ok := m.lookupFunction(fn.Name)
        if !ok {
            m.Infof("skipping execution of '%s': unregistered function", fn.Name)
            m.respf(fn, 501, "unregistered function: %s", fn.Name)
            continue
        }
        if function == nil {
            m.Warningf("skipping execution of '%s': nil function registered", fn.Name)
            m.respf(fn, 501, "nil function: %s", fn.Name)
            continue
        }

        function(*fn)
    }
}

func (m *Manager) lookupFunction(name string) (func(Function), bool) {
    m.mux.Lock()
    defer m.mux.Unlock()

    f, ok := m.FunctionRegistry[name]
    return f, ok
}

func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) {
    bs, _ := json.Marshal(struct {
        Status  int    `json:"status"`
        Message string `json:"message"`
    }{
        Status:  code,
        Message: fmt.Sprintf(msgf, a...),
    })
    ts := strconv.FormatInt(time.Now().Unix(), 10)
    m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
}