netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/modules/activemq/collect.go

Summary

Maintainability
C
1 day
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package activemq

import (
    "fmt"
    "strings"
)

const (
    keyQueues   = "queues"
    keyTopics   = "topics"
    keyAdvisory = "Advisory"
)

var nameReplacer = strings.NewReplacer(".", "_", " ", "")

func (a *ActiveMQ) collect() (map[string]int64, error) {
    metrics := make(map[string]int64)

    var (
        queues *queues
        topics *topics
        err    error
    )

    if queues, err = a.apiClient.getQueues(); err != nil {
        return nil, err
    }

    if topics, err = a.apiClient.getTopics(); err != nil {
        return nil, err
    }

    a.processQueues(queues, metrics)
    a.processTopics(topics, metrics)

    return metrics, nil
}

func (a *ActiveMQ) processQueues(queues *queues, metrics map[string]int64) {
    var (
        count   = len(a.activeQueues)
        updated = make(map[string]bool)
        unp     int
    )

    for _, q := range queues.Items {
        if strings.Contains(q.Name, keyAdvisory) {
            continue
        }

        if !a.activeQueues[q.Name] {
            if a.MaxQueues != 0 && count > a.MaxQueues {
                unp++
                continue
            }

            if !a.filterQueues(q.Name) {
                continue
            }

            a.activeQueues[q.Name] = true
            a.addQueueTopicCharts(q.Name, keyQueues)
        }

        rname := nameReplacer.Replace(q.Name)

        metrics["queues_"+rname+"_consumers"] = q.Stats.ConsumerCount
        metrics["queues_"+rname+"_enqueued"] = q.Stats.EnqueueCount
        metrics["queues_"+rname+"_dequeued"] = q.Stats.DequeueCount
        metrics["queues_"+rname+"_unprocessed"] = q.Stats.EnqueueCount - q.Stats.DequeueCount

        updated[q.Name] = true
    }

    for name := range a.activeQueues {
        if !updated[name] {
            delete(a.activeQueues, name)
            a.removeQueueTopicCharts(name, keyQueues)
        }
    }

    if unp > 0 {
        a.Debugf("%d queues were unprocessed due to max_queues limit (%d)", unp, a.MaxQueues)
    }
}

func (a *ActiveMQ) processTopics(topics *topics, metrics map[string]int64) {
    var (
        count   = len(a.activeTopics)
        updated = make(map[string]bool)
        unp     int
    )

    for _, t := range topics.Items {
        if strings.Contains(t.Name, keyAdvisory) {
            continue
        }

        if !a.activeTopics[t.Name] {
            if a.MaxTopics != 0 && count > a.MaxTopics {
                unp++
                continue
            }

            if !a.filterTopics(t.Name) {
                continue
            }

            a.activeTopics[t.Name] = true
            a.addQueueTopicCharts(t.Name, keyTopics)
        }

        rname := nameReplacer.Replace(t.Name)

        metrics["topics_"+rname+"_consumers"] = t.Stats.ConsumerCount
        metrics["topics_"+rname+"_enqueued"] = t.Stats.EnqueueCount
        metrics["topics_"+rname+"_dequeued"] = t.Stats.DequeueCount
        metrics["topics_"+rname+"_unprocessed"] = t.Stats.EnqueueCount - t.Stats.DequeueCount

        updated[t.Name] = true
    }

    for name := range a.activeTopics {
        if !updated[name] {
            // TODO: delete after timeout?
            delete(a.activeTopics, name)
            a.removeQueueTopicCharts(name, keyTopics)
        }
    }

    if unp > 0 {
        a.Debugf("%d topics were unprocessed due to max_topics limit (%d)", unp, a.MaxTopics)
    }
}

func (a *ActiveMQ) filterQueues(line string) bool {
    if a.queuesFilter == nil {
        return true
    }
    return a.queuesFilter.MatchString(line)
}

func (a *ActiveMQ) filterTopics(line string) bool {
    if a.topicsFilter == nil {
        return true
    }
    return a.topicsFilter.MatchString(line)
}

func (a *ActiveMQ) addQueueTopicCharts(name, typ string) {
    rname := nameReplacer.Replace(name)

    charts := charts.Copy()

    for _, chart := range *charts {
        chart.ID = fmt.Sprintf(chart.ID, typ, rname)
        chart.Title = fmt.Sprintf(chart.Title, name)
        chart.Fam = typ

        for _, dim := range chart.Dims {
            dim.ID = fmt.Sprintf(dim.ID, typ, rname)
        }
    }

    _ = a.charts.Add(*charts...)

}

func (a *ActiveMQ) removeQueueTopicCharts(name, typ string) {
    rname := nameReplacer.Replace(name)

    chart := a.charts.Get(fmt.Sprintf("%s_%s_messages", typ, rname))
    chart.MarkRemove()
    chart.MarkNotCreated()

    chart = a.charts.Get(fmt.Sprintf("%s_%s_unprocessed_messages", typ, rname))
    chart.MarkRemove()
    chart.MarkNotCreated()

    chart = a.charts.Get(fmt.Sprintf("%s_%s_consumers", typ, rname))
    chart.MarkRemove()
    chart.MarkNotCreated()
}