firehol/netdata

View on GitHub
src/go/plugin/go.d/modules/hdfs/collect.go

Summary

Maintainability
A
2 hrs
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package hdfs

import (
    "encoding/json"
    "errors"
    "fmt"
    "strings"

    "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/stm"
    "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web"
)

func (h *HDFS) collect() (map[string]int64, error) {
    req, err := web.NewHTTPRequest(h.RequestConfig)
    if err != nil {
        return nil, fmt.Errorf("failed to create HTTP request: %v", err)
    }

    var raw rawJMX
    if err := web.DoHTTP(h.httpClient).RequestJSON(req, &raw); err != nil {
        return nil, err
    }

    if raw.isEmpty() {
        return nil, errors.New("empty response")
    }

    mx := h.collectRawJMX(raw)

    return stm.ToMap(mx), nil
}

func (h *HDFS) determineNodeType() (nodeType, error) {
    req, err := web.NewHTTPRequest(h.RequestConfig)
    if err != nil {
        return "", fmt.Errorf("failed to create HTTP request: %v", err)
    }

    var raw rawJMX
    if err := web.DoHTTP(h.httpClient).RequestJSON(req, &raw); err != nil {
        return "", err
    }

    if raw.isEmpty() {
        return "", errors.New("empty response")
    }

    jvm := raw.findJvm()
    if jvm == nil {
        return "", errors.New("couldn't find jvm in response")
    }

    v, ok := jvm["tag.ProcessName"]
    if !ok {
        return "", errors.New("couldn't find process name in JvmMetrics")
    }

    t := nodeType(strings.Trim(string(v), "\""))
    if t == nameNodeType || t == dataNodeType {
        return t, nil
    }
    return "", errors.New("unknown node type")
}

func (h *HDFS) collectRawJMX(raw rawJMX) *metrics {
    var mx metrics
    switch h.nodeType {
    default:
        panic(fmt.Sprintf("unsupported node type : '%s'", h.nodeType))
    case nameNodeType:
        h.collectNameNode(&mx, raw)
    case dataNodeType:
        h.collectDataNode(&mx, raw)
    }
    return &mx
}

func (h *HDFS) collectNameNode(mx *metrics, raw rawJMX) {
    if err := h.collectJVM(mx, raw); err != nil {
        h.Debugf("error on collecting jvm : %v", err)
    }

    if err := h.collectRPCActivity(mx, raw); err != nil {
        h.Debugf("error on collecting rpc activity : %v", err)
    }

    if err := h.collectFSNameSystem(mx, raw); err != nil {
        h.Debugf("error on collecting fs name system : %v", err)
    }
}

func (h *HDFS) collectDataNode(mx *metrics, raw rawJMX) {
    if err := h.collectJVM(mx, raw); err != nil {
        h.Debugf("error on collecting jvm : %v", err)
    }

    if err := h.collectRPCActivity(mx, raw); err != nil {
        h.Debugf("error on collecting rpc activity : %v", err)
    }

    if err := h.collectFSDatasetState(mx, raw); err != nil {
        h.Debugf("error on collecting fs dataset state : %v", err)
    }

    if err := h.collectDataNodeActivity(mx, raw); err != nil {
        h.Debugf("error on collecting datanode activity state : %v", err)
    }
}

func (h *HDFS) collectJVM(mx *metrics, raw rawJMX) error {
    v := raw.findJvm()
    if v == nil {
        return nil
    }

    var jvm jvmMetrics
    err := writeJSONTo(&jvm, v)
    if err != nil {
        return err
    }

    mx.Jvm = &jvm
    return nil
}

func (h *HDFS) collectRPCActivity(mx *metrics, raw rawJMX) error {
    v := raw.findRPCActivity()
    if v == nil {
        return nil
    }

    var rpc rpcActivityMetrics
    err := writeJSONTo(&rpc, v)
    if err != nil {
        return err
    }

    mx.Rpc = &rpc
    return nil
}

func (h *HDFS) collectFSNameSystem(mx *metrics, raw rawJMX) error {
    v := raw.findFSNameSystem()
    if v == nil {
        return nil
    }

    var fs fsNameSystemMetrics
    err := writeJSONTo(&fs, v)
    if err != nil {
        return err
    }

    fs.CapacityUsed = fs.CapacityDfsUsed + fs.CapacityUsedNonDFS

    mx.FSNameSystem = &fs
    return nil
}

func (h *HDFS) collectFSDatasetState(mx *metrics, raw rawJMX) error {
    v := raw.findFSDatasetState()
    if v == nil {
        return nil
    }

    var fs fsDatasetStateMetrics
    err := writeJSONTo(&fs, v)
    if err != nil {
        return err
    }

    fs.CapacityUsed = fs.Capacity - fs.Remaining
    fs.CapacityUsedNonDFS = fs.CapacityUsed - fs.DfsUsed

    mx.FSDatasetState = &fs
    return nil
}

func (h *HDFS) collectDataNodeActivity(mx *metrics, raw rawJMX) error {
    v := raw.findDataNodeActivity()
    if v == nil {
        return nil
    }

    var dna dataNodeActivityMetrics
    err := writeJSONTo(&dna, v)
    if err != nil {
        return err
    }

    mx.DataNodeActivity = &dna
    return nil
}

func writeJSONTo(dst, src any) error {
    b, err := json.Marshal(src)
    if err != nil {
        return err
    }
    return json.Unmarshal(b, dst)
}