netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/modules/cassandra/collect.go

Summary

Maintainability
C
1 day
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package cassandra

import (
    "errors"
    "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
    "strings"
)

const (
    suffixCount = "_count"
    suffixValue = "_value"
)

func (c *Cassandra) collect() (map[string]int64, error) {
    pms, err := c.prom.ScrapeSeries()
    if err != nil {
        return nil, err
    }

    if c.validateMetrics {
        if !isCassandraMetrics(pms) {
            return nil, errors.New("collected metrics aren't Cassandra metrics")
        }
        c.validateMetrics = false
    }

    mx := make(map[string]int64)

    c.resetMetrics()
    c.collectMetrics(pms)
    c.processMetric(mx)

    return mx, nil
}

func (c *Cassandra) resetMetrics() {
    cm := newCassandraMetrics()
    for key, p := range c.mx.threadPools {
        cm.threadPools[key] = &threadPoolMetrics{
            name:      p.name,
            hasCharts: p.hasCharts,
        }
    }
    c.mx = cm
}

func (c *Cassandra) processMetric(mx map[string]int64) {
    c.mx.clientReqTotalLatencyReads.write(mx, "client_request_total_latency_reads")
    c.mx.clientReqTotalLatencyWrites.write(mx, "client_request_total_latency_writes")
    c.mx.clientReqLatencyReads.write(mx, "client_request_latency_reads")
    c.mx.clientReqLatencyWrites.write(mx, "client_request_latency_writes")
    c.mx.clientReqTimeoutsReads.write(mx, "client_request_timeouts_reads")
    c.mx.clientReqTimeoutsWrites.write(mx, "client_request_timeouts_writes")
    c.mx.clientReqUnavailablesReads.write(mx, "client_request_unavailables_reads")
    c.mx.clientReqUnavailablesWrites.write(mx, "client_request_unavailables_writes")
    c.mx.clientReqFailuresReads.write(mx, "client_request_failures_reads")
    c.mx.clientReqFailuresWrites.write(mx, "client_request_failures_writes")

    c.mx.clientReqReadLatencyP50.write(mx, "client_request_read_latency_p50")
    c.mx.clientReqReadLatencyP75.write(mx, "client_request_read_latency_p75")
    c.mx.clientReqReadLatencyP95.write(mx, "client_request_read_latency_p95")
    c.mx.clientReqReadLatencyP98.write(mx, "client_request_read_latency_p98")
    c.mx.clientReqReadLatencyP99.write(mx, "client_request_read_latency_p99")
    c.mx.clientReqReadLatencyP999.write(mx, "client_request_read_latency_p999")
    c.mx.clientReqWriteLatencyP50.write(mx, "client_request_write_latency_p50")
    c.mx.clientReqWriteLatencyP75.write(mx, "client_request_write_latency_p75")
    c.mx.clientReqWriteLatencyP95.write(mx, "client_request_write_latency_p95")
    c.mx.clientReqWriteLatencyP98.write(mx, "client_request_write_latency_p98")
    c.mx.clientReqWriteLatencyP99.write(mx, "client_request_write_latency_p99")
    c.mx.clientReqWriteLatencyP999.write(mx, "client_request_write_latency_p999")

    c.mx.rowCacheHits.write(mx, "row_cache_hits")
    c.mx.rowCacheMisses.write(mx, "row_cache_misses")
    c.mx.rowCacheSize.write(mx, "row_cache_size")
    if c.mx.rowCacheHits.isSet && c.mx.rowCacheMisses.isSet {
        if s := c.mx.rowCacheHits.value + c.mx.rowCacheMisses.value; s > 0 {
            mx["row_cache_hit_ratio"] = int64((c.mx.rowCacheHits.value * 100 / s) * 1000)
        } else {
            mx["row_cache_hit_ratio"] = 0
        }
    }
    if c.mx.rowCacheCapacity.isSet && c.mx.rowCacheSize.isSet {
        if s := c.mx.rowCacheCapacity.value; s > 0 {
            mx["row_cache_utilization"] = int64((c.mx.rowCacheSize.value * 100 / s) * 1000)
        } else {
            mx["row_cache_utilization"] = 0
        }
    }

    c.mx.keyCacheHits.write(mx, "key_cache_hits")
    c.mx.keyCacheMisses.write(mx, "key_cache_misses")
    c.mx.keyCacheSize.write(mx, "key_cache_size")
    if c.mx.keyCacheHits.isSet && c.mx.keyCacheMisses.isSet {
        if s := c.mx.keyCacheHits.value + c.mx.keyCacheMisses.value; s > 0 {
            mx["key_cache_hit_ratio"] = int64((c.mx.keyCacheHits.value * 100 / s) * 1000)
        } else {
            mx["key_cache_hit_ratio"] = 0
        }
    }
    if c.mx.keyCacheCapacity.isSet && c.mx.keyCacheSize.isSet {
        if s := c.mx.keyCacheCapacity.value; s > 0 {
            mx["key_cache_utilization"] = int64((c.mx.keyCacheSize.value * 100 / s) * 1000)
        } else {
            mx["key_cache_utilization"] = 0
        }
    }

    c.mx.droppedMessages.write1k(mx, "dropped_messages")

    c.mx.storageLoad.write(mx, "storage_load")
    c.mx.storageExceptions.write(mx, "storage_exceptions")

    c.mx.compactionBytesCompacted.write(mx, "compaction_bytes_compacted")
    c.mx.compactionPendingTasks.write(mx, "compaction_pending_tasks")
    c.mx.compactionCompletedTasks.write(mx, "compaction_completed_tasks")

    c.mx.jvmMemoryHeapUsed.write(mx, "jvm_memory_heap_used")
    c.mx.jvmMemoryNonHeapUsed.write(mx, "jvm_memory_nonheap_used")
    c.mx.jvmGCParNewCount.write(mx, "jvm_gc_parnew_count")
    c.mx.jvmGCParNewTime.write1k(mx, "jvm_gc_parnew_time")
    c.mx.jvmGCCMSCount.write(mx, "jvm_gc_cms_count")
    c.mx.jvmGCCMSTime.write1k(mx, "jvm_gc_cms_time")

    for _, p := range c.mx.threadPools {
        if !p.hasCharts {
            p.hasCharts = true
            c.addThreadPoolCharts(p)
        }

        px := "thread_pool_" + p.name + "_"
        p.activeTasks.write(mx, px+"active_tasks")
        p.pendingTasks.write(mx, px+"pending_tasks")
        p.blockedTasks.write(mx, px+"blocked_tasks")
        p.totalBlockedTasks.write(mx, px+"total_blocked_tasks")
    }
}

func (c *Cassandra) collectMetrics(pms prometheus.Series) {
    c.collectClientRequestMetrics(pms)
    c.collectDroppedMessagesMetrics(pms)
    c.collectThreadPoolsMetrics(pms)
    c.collectStorageMetrics(pms)
    c.collectCacheMetrics(pms)
    c.collectJVMMetrics(pms)
    c.collectCompactionMetrics(pms)
}

func (c *Cassandra) collectClientRequestMetrics(pms prometheus.Series) {
    const metric = "org_apache_cassandra_metrics_clientrequest"

    var rw struct{ read, write *metricValue }
    for _, pm := range pms.FindByName(metric + suffixCount) {
        name := pm.Labels.Get("name")
        scope := pm.Labels.Get("scope")

        switch name {
        case "TotalLatency":
            rw.read, rw.write = &c.mx.clientReqTotalLatencyReads, &c.mx.clientReqTotalLatencyWrites
        case "Latency":
            rw.read, rw.write = &c.mx.clientReqLatencyReads, &c.mx.clientReqLatencyWrites
        case "Timeouts":
            rw.read, rw.write = &c.mx.clientReqTimeoutsReads, &c.mx.clientReqTimeoutsWrites
        case "Unavailables":
            rw.read, rw.write = &c.mx.clientReqUnavailablesReads, &c.mx.clientReqUnavailablesWrites
        case "Failures":
            rw.read, rw.write = &c.mx.clientReqFailuresReads, &c.mx.clientReqFailuresWrites
        default:
            continue
        }

        switch scope {
        case "Read":
            rw.read.add(pm.Value)
        case "Write":
            rw.write.add(pm.Value)
        }
    }

    rw = struct{ read, write *metricValue }{}

    for _, pm := range pms.FindByNames(
        metric+"_50thpercentile",
        metric+"_75thpercentile",
        metric+"_95thpercentile",
        metric+"_98thpercentile",
        metric+"_99thpercentile",
        metric+"_999thpercentile",
    ) {
        name := pm.Labels.Get("name")
        scope := pm.Labels.Get("scope")

        if name != "Latency" {
            continue
        }

        switch {
        case strings.HasSuffix(pm.Name(), "_50thpercentile"):
            rw.read, rw.write = &c.mx.clientReqReadLatencyP50, &c.mx.clientReqWriteLatencyP50
        case strings.HasSuffix(pm.Name(), "_75thpercentile"):
            rw.read, rw.write = &c.mx.clientReqReadLatencyP75, &c.mx.clientReqWriteLatencyP75
        case strings.HasSuffix(pm.Name(), "_95thpercentile"):
            rw.read, rw.write = &c.mx.clientReqReadLatencyP95, &c.mx.clientReqWriteLatencyP95
        case strings.HasSuffix(pm.Name(), "_98thpercentile"):
            rw.read, rw.write = &c.mx.clientReqReadLatencyP98, &c.mx.clientReqWriteLatencyP98
        case strings.HasSuffix(pm.Name(), "_99thpercentile"):
            rw.read, rw.write = &c.mx.clientReqReadLatencyP99, &c.mx.clientReqWriteLatencyP99
        case strings.HasSuffix(pm.Name(), "_999thpercentile"):
            rw.read, rw.write = &c.mx.clientReqReadLatencyP999, &c.mx.clientReqWriteLatencyP999
        default:
            continue
        }

        switch scope {
        case "Read":
            rw.read.add(pm.Value)
        case "Write":
            rw.write.add(pm.Value)
        }
    }
}

func (c *Cassandra) collectCacheMetrics(pms prometheus.Series) {
    const metric = "org_apache_cassandra_metrics_cache"

    var hm struct{ hits, misses *metricValue }
    for _, pm := range pms.FindByName(metric + suffixCount) {
        name := pm.Labels.Get("name")
        scope := pm.Labels.Get("scope")

        switch scope {
        case "KeyCache":
            hm.hits, hm.misses = &c.mx.keyCacheHits, &c.mx.keyCacheMisses
        case "RowCache":
            hm.hits, hm.misses = &c.mx.rowCacheHits, &c.mx.rowCacheMisses
        default:
            continue
        }

        switch name {
        case "Hits":
            hm.hits.add(pm.Value)
        case "Misses":
            hm.misses.add(pm.Value)
        }
    }

    var cs struct{ cap, size *metricValue }
    for _, pm := range pms.FindByName(metric + suffixValue) {
        name := pm.Labels.Get("name")
        scope := pm.Labels.Get("scope")

        switch scope {
        case "KeyCache":
            cs.cap, cs.size = &c.mx.keyCacheCapacity, &c.mx.keyCacheSize
        case "RowCache":
            cs.cap, cs.size = &c.mx.rowCacheCapacity, &c.mx.rowCacheSize
        default:
            continue
        }

        switch name {
        case "Capacity":
            cs.cap.add(pm.Value)
        case "Size":
            cs.size.add(pm.Value)
        }
    }
}

func (c *Cassandra) collectThreadPoolsMetrics(pms prometheus.Series) {
    const metric = "org_apache_cassandra_metrics_threadpools"

    for _, pm := range pms.FindByName(metric + suffixValue) {
        name := pm.Labels.Get("name")
        scope := pm.Labels.Get("scope")
        pool := c.getThreadPoolMetrics(scope)

        switch name {
        case "ActiveTasks":
            pool.activeTasks.add(pm.Value)
        case "PendingTasks":
            pool.pendingTasks.add(pm.Value)
        }
    }
    for _, pm := range pms.FindByName(metric + suffixCount) {
        name := pm.Labels.Get("name")
        scope := pm.Labels.Get("scope")
        pool := c.getThreadPoolMetrics(scope)

        switch name {
        case "CompletedTasks":
            pool.totalBlockedTasks.add(pm.Value)
        case "TotalBlockedTasks":
            pool.totalBlockedTasks.add(pm.Value)
        case "CurrentlyBlockedTasks":
            pool.blockedTasks.add(pm.Value)
        }
    }
}

func (c *Cassandra) collectStorageMetrics(pms prometheus.Series) {
    const metric = "org_apache_cassandra_metrics_storage"

    for _, pm := range pms.FindByName(metric + suffixCount) {
        name := pm.Labels.Get("name")

        switch name {
        case "Load":
            c.mx.storageLoad.add(pm.Value)
        case "Exceptions":
            c.mx.storageExceptions.add(pm.Value)
        }
    }
}

func (c *Cassandra) collectDroppedMessagesMetrics(pms prometheus.Series) {
    const metric = "org_apache_cassandra_metrics_droppedmessage"

    for _, pm := range pms.FindByName(metric + suffixCount) {
        c.mx.droppedMessages.add(pm.Value)
    }
}

func (c *Cassandra) collectJVMMetrics(pms prometheus.Series) {
    const metricMemUsed = "jvm_memory_bytes_used"
    const metricGC = "jvm_gc_collection_seconds"

    for _, pm := range pms.FindByName(metricMemUsed) {
        area := pm.Labels.Get("area")

        switch area {
        case "heap":
            c.mx.jvmMemoryHeapUsed.add(pm.Value)
        case "nonheap":
            c.mx.jvmMemoryNonHeapUsed.add(pm.Value)
        }
    }

    for _, pm := range pms.FindByName(metricGC + suffixCount) {
        gc := pm.Labels.Get("gc")

        switch gc {
        case "ParNew":
            c.mx.jvmGCParNewCount.add(pm.Value)
        case "ConcurrentMarkSweep":
            c.mx.jvmGCCMSCount.add(pm.Value)
        }
    }

    for _, pm := range pms.FindByName(metricGC + "_sum") {
        gc := pm.Labels.Get("gc")

        switch gc {
        case "ParNew":
            c.mx.jvmGCParNewTime.add(pm.Value)
        case "ConcurrentMarkSweep":
            c.mx.jvmGCCMSTime.add(pm.Value)
        }
    }
}

func (c *Cassandra) collectCompactionMetrics(pms prometheus.Series) {
    const metric = "org_apache_cassandra_metrics_compaction"

    for _, pm := range pms.FindByName(metric + suffixValue) {
        name := pm.Labels.Get("name")

        switch name {
        case "CompletedTasks":
            c.mx.compactionCompletedTasks.add(pm.Value)
        case "PendingTasks":
            c.mx.compactionPendingTasks.add(pm.Value)
        }
    }
    for _, pm := range pms.FindByName(metric + suffixCount) {
        name := pm.Labels.Get("name")

        switch name {
        case "BytesCompacted":
            c.mx.compactionBytesCompacted.add(pm.Value)
        }
    }
}

func (c *Cassandra) getThreadPoolMetrics(name string) *threadPoolMetrics {
    pool, ok := c.mx.threadPools[name]
    if !ok {
        pool = &threadPoolMetrics{name: name}
        c.mx.threadPools[name] = pool
    }
    return pool
}

func isCassandraMetrics(pms prometheus.Series) bool {
    for _, pm := range pms {
        if strings.HasPrefix(pm.Name(), "org_apache_cassandra_metrics") {
            return true
        }
    }
    return false
}