firehol/netdata

View on GitHub
src/go/plugin/go.d/modules/beanstalk/client.go

Summary

Maintainability
A
1 hr
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package beanstalk

import (
    "errors"
    "fmt"
    "strconv"
    "strings"

    "github.com/netdata/netdata/go/plugins/logger"
    "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/socket"

    "gopkg.in/yaml.v2"
)

type beanstalkConn interface {
    connect() error
    disconnect() error
    queryStats() (*beanstalkdStats, error)
    queryListTubes() ([]string, error)
    queryStatsTube(string) (*tubeStats, error)
}

// https://github.com/beanstalkd/beanstalkd/blob/91c54fc05dc759ef27459ce4383934e1a4f2fb4b/doc/protocol.txt#L553
type beanstalkdStats struct {
    CurrentJobsUrgent     int64   `yaml:"current-jobs-urgent" stm:"current-jobs-urgent"`
    CurrentJobsReady      int64   `yaml:"current-jobs-ready" stm:"current-jobs-ready"`
    CurrentJobsReserved   int64   `yaml:"current-jobs-reserved" stm:"current-jobs-reserved"`
    CurrentJobsDelayed    int64   `yaml:"current-jobs-delayed" stm:"current-jobs-delayed"`
    CurrentJobsBuried     int64   `yaml:"current-jobs-buried" stm:"current-jobs-buried"`
    CmdPut                int64   `yaml:"cmd-put" stm:"cmd-put"`
    CmdPeek               int64   `yaml:"cmd-peek" stm:"cmd-peek"`
    CmdPeekReady          int64   `yaml:"cmd-peek-ready" stm:"cmd-peek-ready"`
    CmdPeekDelayed        int64   `yaml:"cmd-peek-delayed" stm:"cmd-peek-delayed"`
    CmdPeekBuried         int64   `yaml:"cmd-peek-buried" stm:"cmd-peek-buried"`
    CmdReserve            int64   `yaml:"cmd-reserve" stm:"cmd-reserve"`
    CmdReserveWithTimeout int64   `yaml:"cmd-reserve-with-timeout" stm:"cmd-reserve-with-timeout"`
    CmdTouch              int64   `yaml:"cmd-touch" stm:"cmd-touch"`
    CmdUse                int64   `yaml:"cmd-use" stm:"cmd-use"`
    CmdWatch              int64   `yaml:"cmd-watch" stm:"cmd-watch"`
    CmdIgnore             int64   `yaml:"cmd-ignore" stm:"cmd-ignore"`
    CmdDelete             int64   `yaml:"cmd-delete" stm:"cmd-delete"`
    CmdRelease            int64   `yaml:"cmd-release" stm:"cmd-release"`
    CmdBury               int64   `yaml:"cmd-bury" stm:"cmd-bury"`
    CmdKick               int64   `yaml:"cmd-kick" stm:"cmd-kick"`
    CmdStats              int64   `yaml:"cmd-stats" stm:"cmd-stats"`
    CmdStatsJob           int64   `yaml:"cmd-stats-job" stm:"cmd-stats-job"`
    CmdStatsTube          int64   `yaml:"cmd-stats-tube" stm:"cmd-stats-tube"`
    CmdListTubes          int64   `yaml:"cmd-list-tubes" stm:"cmd-list-tubes"`
    CmdListTubeUsed       int64   `yaml:"cmd-list-tube-used" stm:"cmd-list-tube-used"`
    CmdListTubesWatched   int64   `yaml:"cmd-list-tubes-watched" stm:"cmd-list-tubes-watched"`
    CmdPauseTube          int64   `yaml:"cmd-pause-tube" stm:"cmd-pause-tube"`
    JobTimeouts           int64   `yaml:"job-timeouts" stm:"job-timeouts"`
    TotalJobs             int64   `yaml:"total-jobs" stm:"total-jobs"`
    CurrentTubes          int64   `yaml:"current-tubes" stm:"current-tubes"`
    CurrentConnections    int64   `yaml:"current-connections" stm:"current-connections"`
    CurrentProducers      int64   `yaml:"current-producers" stm:"current-producers"`
    CurrentWorkers        int64   `yaml:"current-workers" stm:"current-workers"`
    CurrentWaiting        int64   `yaml:"current-waiting" stm:"current-waiting"`
    TotalConnections      int64   `yaml:"total-connections" stm:"total-connections"`
    RusageUtime           float64 `yaml:"rusage-utime" stm:"rusage-utime,1000,1"`
    RusageStime           float64 `yaml:"rusage-stime" stm:"rusage-stime,1000,1"`
    Uptime                int64   `yaml:"uptime" stm:"uptime"`
    BinlogRecordsWritten  int64   `yaml:"binlog-records-written" stm:"binlog-records-written"`
    BinlogRecordsMigrated int64   `yaml:"binlog-records-migrated" stm:"binlog-records-migrated"`
}

// https://github.com/beanstalkd/beanstalkd/blob/91c54fc05dc759ef27459ce4383934e1a4f2fb4b/doc/protocol.txt#L497
type tubeStats struct {
    Name                string  `yaml:"name"`
    CurrentJobsUrgent   int64   `yaml:"current-jobs-urgent" stm:"current-jobs-urgent"`
    CurrentJobsReady    int64   `yaml:"current-jobs-ready" stm:"current-jobs-ready"`
    CurrentJobsReserved int64   `yaml:"current-jobs-reserved" stm:"current-jobs-reserved"`
    CurrentJobsDelayed  int64   `yaml:"current-jobs-delayed" stm:"current-jobs-delayed"`
    CurrentJobsBuried   int64   `yaml:"current-jobs-buried" stm:"current-jobs-buried"`
    TotalJobs           int64   `yaml:"total-jobs" stm:"total-jobs"`
    CurrentUsing        int64   `yaml:"current-using" stm:"current-using"`
    CurrentWaiting      int64   `yaml:"current-waiting" stm:"current-waiting"`
    CurrentWatching     int64   `yaml:"current-watching" stm:"current-watching"`
    Pause               float64 `yaml:"pause" stm:"pause"`
    CmdDelete           int64   `yaml:"cmd-delete" stm:"cmd-delete"`
    CmdPauseTube        int64   `yaml:"cmd-pause-tube" stm:"cmd-pause-tube"`
    PauseTimeLeft       float64 `yaml:"pause-time-left" stm:"pause-time-left"`
}

func newBeanstalkConn(conf Config, log *logger.Logger) beanstalkConn {
    return &beanstalkClient{
        Logger: log,
        client: socket.New(socket.Config{
            Address:        conf.Address,
            ConnectTimeout: conf.Timeout.Duration(),
            ReadTimeout:    conf.Timeout.Duration(),
            WriteTimeout:   conf.Timeout.Duration(),
            TLSConf:        nil,
        }),
    }
}

const (
    cmdQuit      = "quit"
    cmdStats     = "stats"
    cmdListTubes = "list-tubes"
    cmdStatsTube = "stats-tube"
)

type beanstalkClient struct {
    *logger.Logger

    client socket.Client
}

func (c *beanstalkClient) connect() error {
    return c.client.Connect()
}

func (c *beanstalkClient) disconnect() error {
    _, _, _ = c.query(cmdQuit)
    return c.client.Disconnect()
}

func (c *beanstalkClient) queryStats() (*beanstalkdStats, error) {
    cmd := cmdStats

    resp, data, err := c.query(cmd)
    if err != nil {
        return nil, err
    }
    if resp != "OK" {
        return nil, fmt.Errorf("command '%s' bad response: %s", cmd, resp)
    }

    var stats beanstalkdStats

    if err := yaml.Unmarshal(data, &stats); err != nil {
        return nil, err
    }

    return &stats, nil
}

func (c *beanstalkClient) queryListTubes() ([]string, error) {
    cmd := cmdListTubes

    resp, data, err := c.query(cmd)
    if err != nil {
        return nil, err
    }
    if resp != "OK" {
        return nil, fmt.Errorf("command '%s' bad response: %s", cmd, resp)
    }

    var tubes []string

    if err := yaml.Unmarshal(data, &tubes); err != nil {
        return nil, err
    }

    return tubes, nil
}

func (c *beanstalkClient) queryStatsTube(tubeName string) (*tubeStats, error) {
    cmd := fmt.Sprintf("%s %s", cmdStatsTube, tubeName)

    resp, data, err := c.query(cmd)
    if err != nil {
        return nil, err
    }
    if resp == "NOT_FOUND" {
        return nil, nil
    }
    if resp != "OK" {
        return nil, fmt.Errorf("command '%s' bad response: %s", cmd, resp)
    }

    var stats tubeStats
    if err := yaml.Unmarshal(data, &stats); err != nil {
        return nil, err
    }

    return &stats, nil
}

func (c *beanstalkClient) query(command string) (string, []byte, error) {
    var resp string
    var length int
    var body []byte
    var err error

    c.Debugf("executing command: %s", command)

    const limitReadLines = 1000
    var num int

    clientErr := c.client.Command(command+"\r\n", func(line []byte) bool {
        if resp == "" {
            s := string(line)
            c.Debugf("command '%s' response: '%s'", command, s)

            resp, length, err = parseResponseLine(s)
            if err != nil {
                err = fmt.Errorf("command '%s' line '%s': %v", command, s, err)
            }
            return err == nil && resp == "OK"
        }

        if num++; num >= limitReadLines {
            err = fmt.Errorf("command '%s': read line limit exceeded (%d)", command, limitReadLines)
            return false
        }

        body = append(body, line...)
        body = append(body, '\n')

        return len(body) < length
    })
    if clientErr != nil {
        return "", nil, fmt.Errorf("command '%s' client error: %v", command, clientErr)
    }
    if err != nil {
        return "", nil, err
    }

    return resp, body, nil
}

func parseResponseLine(line string) (string, int, error) {
    parts := strings.Fields(line)
    if len(parts) == 0 {
        return "", 0, errors.New("empty response")
    }

    resp := parts[0]

    if resp != "OK" {
        return resp, 0, nil
    }

    if len(parts) < 2 {
        return "", 0, errors.New("missing bytes count")
    }

    length, err := strconv.Atoi(parts[1])
    if err != nil {
        return "", 0, errors.New("invalid bytes count")
    }

    return resp, length, nil
}