netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/modules/pulsar/charts.go

Summary

Maintainability
C
7 hrs
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package pulsar

import (
    "fmt"
    "strings"

    "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"

    "github.com/netdata/netdata/go/go.d.plugin/agent/module"
)

type (
    Charts = module.Charts
    Chart  = module.Chart
    Dims   = module.Dims
    Dim    = module.Dim
    Opts   = module.Opts
)

var summaryCharts = Charts{
    sumBrokerComponentsChart.Copy(),

    sumMessagesRateChart.Copy(),
    sumThroughputRateChart.Copy(),

    sumStorageSizeChart.Copy(),
    sumStorageOperationsRateChart.Copy(), // optional
    sumMsgBacklogSizeChart.Copy(),
    sumStorageWriteLatencyChart.Copy(),
    sumEntrySizeChart.Copy(),

    sumSubsDelayedChart.Copy(),
    sumSubsMsgRateRedeliverChart.Copy(),    // optional
    sumSubsBlockedOnUnackedMsgChart.Copy(), // optional

    sumReplicationRateChart.Copy(),           // optional
    sumReplicationThroughputRateChart.Copy(), // optional
    sumReplicationBacklogChart.Copy(),        // optional
}

var (
    sumBrokerComponentsChart = Chart{
        ID:    "broker_components",
        Title: "Broker Components",
        Units: "components",
        Fam:   "ns summary",
        Ctx:   "pulsar.broker_components",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: "pulsar_namespaces_count", Name: "namespaces"},
            {ID: metricPulsarTopicsCount, Name: "topics"},
            {ID: metricPulsarSubscriptionsCount, Name: "subscriptions"},
            {ID: metricPulsarProducersCount, Name: "producers"},
            {ID: metricPulsarConsumersCount, Name: "consumers"},
        },
    }
    sumMessagesRateChart = Chart{
        ID:    "messages_rate",
        Title: "Messages Rate",
        Units: "messages/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.messages_rate",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarRateIn, Name: "publish", Div: 1000},
            {ID: metricPulsarRateOut, Name: "dispatch", Mul: -1, Div: 1000},
        },
    }
    sumThroughputRateChart = Chart{
        ID:    "throughput_rate",
        Title: "Throughput Rate",
        Units: "KiB/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.throughput_rate",
        Type:  module.Area,
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarThroughputIn, Name: "publish", Div: 1024 * 1000},
            {ID: metricPulsarThroughputOut, Name: "dispatch", Mul: -1, Div: 1024 * 1000},
        },
    }
    sumStorageSizeChart = Chart{
        ID:    "storage_size",
        Title: "Storage Size",
        Units: "KiB",
        Fam:   "ns summary",
        Ctx:   "pulsar.storage_size",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarStorageSize, Name: "used", Div: 1024},
        },
    }
    sumStorageOperationsRateChart = Chart{
        ID:    "storage_operations_rate",
        Title: "Storage Read/Write Operations Rate",
        Units: "message batches/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.storage_operations_rate",
        Type:  module.Area,
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarStorageReadRate, Name: "read", Div: 1000},
            {ID: metricPulsarStorageWriteRate, Name: "write", Mul: -1, Div: 1000},
        },
    }
    sumMsgBacklogSizeChart = Chart{
        ID:    "msg_backlog",
        Title: "Messages Backlog Size",
        Units: "messages",
        Fam:   "ns summary",
        Ctx:   "pulsar.msg_backlog",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarMsgBacklog, Name: "backlog"},
        },
    }
    sumStorageWriteLatencyChart = Chart{
        ID:    "storage_write_latency",
        Title: "Storage Write Latency",
        Units: "entries/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.storage_write_latency",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: "pulsar_storage_write_latency_le_0_5", Name: "<=0.5ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_1", Name: "<=1ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_5", Name: "<=5ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_10", Name: "<=10ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_20", Name: "<=20ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_50", Name: "<=50ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_100", Name: "<=100ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_200", Name: "<=200ms", Div: 60},
            {ID: "pulsar_storage_write_latency_le_1000", Name: "<=1s", Div: 60},
            {ID: "pulsar_storage_write_latency_overflow", Name: ">1s", Div: 60},
        },
    }
    sumEntrySizeChart = Chart{
        ID:    "entry_size",
        Title: "Entry Size",
        Units: "entries/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.entry_size",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: "pulsar_entry_size_le_128", Name: "<=128B", Div: 60},
            {ID: "pulsar_entry_size_le_512", Name: "<=512B", Div: 60},
            {ID: "pulsar_entry_size_le_1_kb", Name: "<=1KB", Div: 60},
            {ID: "pulsar_entry_size_le_2_kb", Name: "<=2KB", Div: 60},
            {ID: "pulsar_entry_size_le_4_kb", Name: "<=4KB", Div: 60},
            {ID: "pulsar_entry_size_le_16_kb", Name: "<=16KB", Div: 60},
            {ID: "pulsar_entry_size_le_100_kb", Name: "<=100KB", Div: 60},
            {ID: "pulsar_entry_size_le_1_mb", Name: "<=1MB", Div: 60},
            {ID: "pulsar_entry_size_le_overflow", Name: ">1MB", Div: 60},
        },
    }
    sumSubsDelayedChart = Chart{
        ID:    "subscription_delayed",
        Title: "Subscriptions Delayed for Dispatching",
        Units: "message batches",
        Fam:   "ns summary",
        Ctx:   "pulsar.subscription_delayed",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarSubscriptionDelayed, Name: "delayed"},
        },
    }
    sumSubsMsgRateRedeliverChart = Chart{
        ID:    "subscription_msg_rate_redeliver",
        Title: "Subscriptions Redelivered Message Rate",
        Units: "messages/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.subscription_msg_rate_redeliver",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarSubscriptionMsgRateRedeliver, Name: "redelivered", Div: 1000},
        },
    }
    sumSubsBlockedOnUnackedMsgChart = Chart{
        ID:    "subscription_blocked_on_unacked_messages",
        Title: "Subscriptions Blocked On Unacked Messages",
        Units: "subscriptions",
        Fam:   "ns summary",
        Ctx:   "pulsar.subscription_blocked_on_unacked_messages",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarSubscriptionBlockedOnUnackedMessages, Name: "blocked"},
        },
    }
    sumReplicationRateChart = Chart{
        ID:    "replication_rate",
        Title: "Replication Rate",
        Units: "messages/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.replication_rate",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarReplicationRateIn, Name: "in", Div: 1000},
            {ID: metricPulsarReplicationRateOut, Name: "out", Mul: -1, Div: 1000},
        },
    }
    sumReplicationThroughputRateChart = Chart{
        ID:    "replication_throughput_rate",
        Title: "Replication Throughput Rate",
        Units: "KiB/s",
        Fam:   "ns summary",
        Ctx:   "pulsar.replication_throughput_rate",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarReplicationThroughputIn, Name: "in", Div: 1024 * 1000},
            {ID: metricPulsarReplicationThroughputOut, Name: "out", Mul: -1, Div: 1024 * 1000},
        },
    }
    sumReplicationBacklogChart = Chart{
        ID:    "replication_backlog",
        Title: "Replication Backlog",
        Units: "messages",
        Fam:   "ns summary",
        Ctx:   "pulsar.replication_backlog",
        Opts:  Opts{StoreFirst: true},
        Dims: Dims{
            {ID: metricPulsarReplicationBacklog, Name: "backlog"},
        },
    }
)

var namespaceCharts = Charts{
    nsBrokerComponentsChart.Copy(),
    topicProducersChart.Copy(),
    topicSubscriptionsChart.Copy(),
    topicConsumersChart.Copy(),

    nsMessagesRateChart.Copy(),
    topicMessagesRateInChart.Copy(),
    topicMessagesRateOutChart.Copy(),
    nsThroughputRateCharts.Copy(),
    topicThroughputRateInChart.Copy(),
    topicThroughputRateOutChart.Copy(),

    nsStorageSizeChart.Copy(),
    topicStorageSizeChart.Copy(),
    nsStorageOperationsChart.Copy(),   // optional
    topicStorageReadRateChart.Copy(),  // optional
    topicStorageWriteRateChart.Copy(), // optional
    nsMsgBacklogSizeChart.Copy(),
    topicMsgBacklogSizeChart.Copy(),
    nsStorageWriteLatencyChart.Copy(),
    nsEntrySizeChart.Copy(),

    nsSubsDelayedChart.Copy(),
    topicSubsDelayedChart.Copy(),
    nsSubsMsgRateRedeliverChart.Copy(),       // optional
    topicSubsMsgRateRedeliverChart.Copy(),    // optional
    nsSubsBlockedOnUnackedMsgChart.Copy(),    // optional
    topicSubsBlockedOnUnackedMsgChart.Copy(), // optional

    nsReplicationRateChart.Copy(),                 // optional
    topicReplicationRateInChart.Copy(),            // optional
    topicReplicationRateOutChart.Copy(),           // optional
    nsReplicationThroughputChart.Copy(),           // optional
    topicReplicationThroughputRateInChart.Copy(),  // optional
    topicReplicationThroughputRateOutChart.Copy(), // optional
    nsReplicationBacklogChart.Copy(),              // optional
    topicReplicationBacklogChart.Copy(),           // optional
}

func toNamespaceChart(chart Chart) Chart {
    chart = *chart.Copy()
    if chart.ID == sumBrokerComponentsChart.ID {
        _ = chart.RemoveDim("pulsar_namespaces_count")
    }
    chart.ID += "_namespace_%s"
    chart.Fam = "ns %s"
    if idx := strings.IndexByte(chart.Ctx, '.'); idx > 0 {
        // pulsar.messages_rate => pulsar.namespace_messages_rate
        chart.Ctx = chart.Ctx[:idx+1] + "namespace_" + chart.Ctx[idx+1:]
    }
    for _, dim := range chart.Dims {
        dim.ID += "_%s"
    }
    return chart
}

var (
    nsBrokerComponentsChart        = toNamespaceChart(sumBrokerComponentsChart)
    nsMessagesRateChart            = toNamespaceChart(sumMessagesRateChart)
    nsThroughputRateCharts         = toNamespaceChart(sumThroughputRateChart)
    nsStorageSizeChart             = toNamespaceChart(sumStorageSizeChart)
    nsStorageOperationsChart       = toNamespaceChart(sumStorageOperationsRateChart)
    nsMsgBacklogSizeChart          = toNamespaceChart(sumMsgBacklogSizeChart)
    nsStorageWriteLatencyChart     = toNamespaceChart(sumStorageWriteLatencyChart)
    nsEntrySizeChart               = toNamespaceChart(sumEntrySizeChart)
    nsSubsDelayedChart             = toNamespaceChart(sumSubsDelayedChart)
    nsSubsMsgRateRedeliverChart    = toNamespaceChart(sumSubsMsgRateRedeliverChart)
    nsSubsBlockedOnUnackedMsgChart = toNamespaceChart(sumSubsBlockedOnUnackedMsgChart)
    nsReplicationRateChart         = toNamespaceChart(sumReplicationRateChart)
    nsReplicationThroughputChart   = toNamespaceChart(sumReplicationThroughputRateChart)
    nsReplicationBacklogChart      = toNamespaceChart(sumReplicationBacklogChart)

    topicProducersChart = Chart{
        ID:    "topic_producers_namespace_%s",
        Title: "Topic Producers",
        Units: "producers",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_producers",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicSubscriptionsChart = Chart{
        ID:    "topic_subscriptions_namespace_%s",
        Title: "Topic Subscriptions",
        Units: "subscriptions",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_subscriptions",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicConsumersChart = Chart{
        ID:    "topic_consumers_namespace_%s",
        Title: "Topic Consumers",
        Units: "consumers",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_consumers",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicMessagesRateInChart = Chart{
        ID:    "topic_messages_rate_in_namespace_%s",
        Title: "Topic Publish Messages Rate",
        Units: "publishes/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_messages_rate_in",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicMessagesRateOutChart = Chart{
        ID:    "topic_messages_rate_out_namespace_%s",
        Title: "Topic Dispatch Messages Rate",
        Units: "dispatches/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_messages_rate_out",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicThroughputRateInChart = Chart{
        ID:    "topic_throughput_rate_in_namespace_%s",
        Title: "Topic Publish Throughput Rate",
        Units: "KiB/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_throughput_rate_in",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicThroughputRateOutChart = Chart{
        ID:    "topic_throughput_rate_out_namespace_%s",
        Title: "Topic Dispatch Throughput Rate",
        Units: "KiB/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_throughput_rate_out",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicStorageSizeChart = Chart{
        ID:    "topic_storage_size_namespace_%s",
        Title: "Topic Storage Size",
        Units: "KiB",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_storage_size",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicStorageReadRateChart = Chart{
        ID:    "topic_storage_read_rate_namespace_%s",
        Title: "Topic Storage Read Rate",
        Units: "message batches/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_storage_read_rate",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicStorageWriteRateChart = Chart{
        ID:    "topic_storage_write_rate_namespace_%s",
        Title: "Topic Storage Write Rate",
        Units: "message batches/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_storage_write_rate",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicMsgBacklogSizeChart = Chart{
        ID:    "topic_msg_backlog_namespace_%s",
        Title: "Topic Messages Backlog Size",
        Units: "messages",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_msg_backlog",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicSubsDelayedChart = Chart{
        ID:    "topic_subscription_delayed_namespace_%s",
        Title: "Topic Subscriptions Delayed for Dispatching",
        Units: "message batches",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_subscription_delayed",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicSubsMsgRateRedeliverChart = Chart{
        ID:    "topic_subscription_msg_rate_redeliver_namespace_%s",
        Title: "Topic Subscriptions Redelivered Message Rate",
        Units: "messages/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_subscription_msg_rate_redeliver",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicSubsBlockedOnUnackedMsgChart = Chart{
        ID:    "topic_subscription_blocked_on_unacked_messages_namespace_%s",
        Title: "Topic Subscriptions Blocked On Unacked Messages",
        Units: "blocked subscriptions",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_subscription_blocked_on_unacked_messages",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicReplicationRateInChart = Chart{
        ID:    "topic_replication_rate_in_namespace_%s",
        Title: "Topic Replication Rate From Remote Cluster",
        Units: "messages/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_replication_rate_in",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicReplicationRateOutChart = Chart{
        ID:    "replication_rate_out_namespace_%s",
        Title: "Topic Replication Rate To Remote Cluster",
        Units: "messages/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_replication_rate_out",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicReplicationThroughputRateInChart = Chart{
        ID:    "topic_replication_throughput_rate_in_namespace_%s",
        Title: "Topic Replication Throughput Rate From Remote Cluster",
        Units: "KiB/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_replication_throughput_rate_in",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicReplicationThroughputRateOutChart = Chart{
        ID:    "topic_replication_throughput_rate_out_namespace_%s",
        Title: "Topic Replication Throughput Rate To Remote Cluster",
        Units: "KiB/s",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_replication_throughput_rate_out",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
    topicReplicationBacklogChart = Chart{
        ID:    "topic_replication_backlog_namespace_%s",
        Title: "Topic Replication Backlog",
        Units: "messages",
        Fam:   "ns %s",
        Ctx:   "pulsar.topic_replication_backlog",
        Type:  module.Stacked,
        Opts:  Opts{StoreFirst: true},
    }
)

func (p *Pulsar) adjustCharts(pms prometheus.Series) {
    if pms := pms.FindByName(metricPulsarStorageReadRate); pms.Len() == 0 || pms[0].Labels.Get("namespace") == "" {
        p.removeSummaryChart(sumStorageOperationsRateChart.ID)
        p.removeNamespaceChart(nsStorageOperationsChart.ID)
        p.removeNamespaceChart(topicStorageReadRateChart.ID)
        p.removeNamespaceChart(topicStorageWriteRateChart.ID)
        delete(p.topicChartsMapping, topicStorageReadRateChart.ID)
        delete(p.topicChartsMapping, topicStorageWriteRateChart.ID)
    }
    if pms.FindByName(metricPulsarSubscriptionMsgRateRedeliver).Len() == 0 {
        p.removeSummaryChart(sumSubsMsgRateRedeliverChart.ID)
        p.removeSummaryChart(sumSubsBlockedOnUnackedMsgChart.ID)
        p.removeNamespaceChart(nsSubsMsgRateRedeliverChart.ID)
        p.removeNamespaceChart(nsSubsBlockedOnUnackedMsgChart.ID)
        p.removeNamespaceChart(topicSubsMsgRateRedeliverChart.ID)
        p.removeNamespaceChart(topicSubsBlockedOnUnackedMsgChart.ID)
        delete(p.topicChartsMapping, topicSubsMsgRateRedeliverChart.ID)
        delete(p.topicChartsMapping, topicSubsBlockedOnUnackedMsgChart.ID)
    }
    if pms.FindByName(metricPulsarReplicationBacklog).Len() == 0 {
        p.removeSummaryChart(sumReplicationRateChart.ID)
        p.removeSummaryChart(sumReplicationThroughputRateChart.ID)
        p.removeSummaryChart(sumReplicationBacklogChart.ID)
        p.removeNamespaceChart(nsReplicationRateChart.ID)
        p.removeNamespaceChart(nsReplicationThroughputChart.ID)
        p.removeNamespaceChart(nsReplicationBacklogChart.ID)
        p.removeNamespaceChart(topicReplicationRateInChart.ID)
        p.removeNamespaceChart(topicReplicationRateOutChart.ID)
        p.removeNamespaceChart(topicReplicationThroughputRateInChart.ID)
        p.removeNamespaceChart(topicReplicationThroughputRateOutChart.ID)
        p.removeNamespaceChart(topicReplicationBacklogChart.ID)
        delete(p.topicChartsMapping, topicReplicationRateInChart.ID)
        delete(p.topicChartsMapping, topicReplicationRateOutChart.ID)
        delete(p.topicChartsMapping, topicReplicationThroughputRateInChart.ID)
        delete(p.topicChartsMapping, topicReplicationThroughputRateOutChart.ID)
        delete(p.topicChartsMapping, topicReplicationBacklogChart.ID)
    }
}

func (p *Pulsar) removeSummaryChart(chartID string) {
    if err := p.Charts().Remove(chartID); err != nil {
        p.Warning(err)
    }
}

func (p *Pulsar) removeNamespaceChart(chartID string) {
    if err := p.nsCharts.Remove(chartID); err != nil {
        p.Warning(err)
    }
}

func (p *Pulsar) updateCharts() {
    // NOTE: order is important
    for ns := range p.curCache.namespaces {
        if !p.cache.namespaces[ns] {
            p.cache.namespaces[ns] = true
            p.addNamespaceCharts(ns)
        }
    }
    for top := range p.curCache.topics {
        if !p.cache.topics[top] {
            p.cache.topics[top] = true
            p.addTopicToCharts(top)
        }
    }
    for top := range p.cache.topics {
        if p.curCache.topics[top] {
            continue
        }
        delete(p.cache.topics, top)
        p.removeTopicFromCharts(top)
    }
    for ns := range p.cache.namespaces {
        if p.curCache.namespaces[ns] {
            continue
        }
        delete(p.cache.namespaces, ns)
        p.removeNamespaceFromCharts(ns)
    }
}

func (p *Pulsar) addNamespaceCharts(ns namespace) {
    charts := p.nsCharts.Copy()
    for _, chart := range *charts {
        chart.ID = fmt.Sprintf(chart.ID, ns.name)
        chart.Fam = fmt.Sprintf(chart.Fam, ns.name)
        for _, dim := range chart.Dims {
            dim.ID = fmt.Sprintf(dim.ID, ns.name)
        }
    }
    if err := p.Charts().Add(*charts...); err != nil {
        p.Warning(err)
    }
}

func (p *Pulsar) removeNamespaceFromCharts(ns namespace) {
    for _, chart := range *p.nsCharts {
        id := fmt.Sprintf(chart.ID, ns.name)
        if chart = p.Charts().Get(id); chart != nil {
            chart.MarkRemove()
        } else {
            p.Warningf("could not remove namespace chart '%s'", id)
        }
    }
}

func (p *Pulsar) addTopicToCharts(top topic) {
    for id, metric := range p.topicChartsMapping {
        id = fmt.Sprintf(id, top.namespace)
        chart := p.Charts().Get(id)
        if chart == nil {
            p.Warningf("could not add topic '%s' to chart '%s': chart not found", top.name, id)
            continue
        }

        dim := Dim{ID: metric + "_" + top.name, Name: extractTopicName(top)}
        switch metric {
        case metricPulsarThroughputIn,
            metricPulsarThroughputOut,
            metricPulsarReplicationThroughputIn,
            metricPulsarReplicationThroughputOut:
            dim.Div = 1024 * 1000
        case metricPulsarRateIn,
            metricPulsarRateOut,
            metricPulsarStorageWriteRate,
            metricPulsarStorageReadRate,
            metricPulsarSubscriptionMsgRateRedeliver,
            metricPulsarReplicationRateIn,
            metricPulsarReplicationRateOut:
            dim.Div = 1000
        case metricPulsarStorageSize:
            dim.Div = 1024
        }

        if err := chart.AddDim(&dim); err != nil {
            p.Warning(err)
        }
        chart.MarkNotCreated()
    }
}

func (p *Pulsar) removeTopicFromCharts(top topic) {
    for id, metric := range p.topicChartsMapping {
        id = fmt.Sprintf(id, top.namespace)
        chart := p.Charts().Get(id)
        if chart == nil {
            p.Warningf("could not remove topic '%s' from chart '%s': chart not found", top.name, id)
            continue
        }

        if err := chart.MarkDimRemove(metric+"_"+top.name, true); err != nil {
            p.Warning(err)
        }
        chart.MarkNotCreated()
    }
}

func topicChartsMapping() map[string]string {
    return map[string]string{
        topicSubscriptionsChart.ID:                metricPulsarSubscriptionsCount,
        topicProducersChart.ID:                    metricPulsarProducersCount,
        topicConsumersChart.ID:                    metricPulsarConsumersCount,
        topicMessagesRateInChart.ID:               metricPulsarRateIn,
        topicMessagesRateOutChart.ID:              metricPulsarRateOut,
        topicThroughputRateInChart.ID:             metricPulsarThroughputIn,
        topicThroughputRateOutChart.ID:            metricPulsarThroughputOut,
        topicStorageSizeChart.ID:                  metricPulsarStorageSize,
        topicStorageReadRateChart.ID:              metricPulsarStorageReadRate,
        topicStorageWriteRateChart.ID:             metricPulsarStorageWriteRate,
        topicMsgBacklogSizeChart.ID:               metricPulsarMsgBacklog,
        topicSubsDelayedChart.ID:                  metricPulsarSubscriptionDelayed,
        topicSubsMsgRateRedeliverChart.ID:         metricPulsarSubscriptionMsgRateRedeliver,
        topicSubsBlockedOnUnackedMsgChart.ID:      metricPulsarSubscriptionBlockedOnUnackedMessages,
        topicReplicationRateInChart.ID:            metricPulsarReplicationRateIn,
        topicReplicationRateOutChart.ID:           metricPulsarReplicationRateOut,
        topicReplicationThroughputRateInChart.ID:  metricPulsarReplicationThroughputIn,
        topicReplicationThroughputRateOutChart.ID: metricPulsarReplicationThroughputOut,
        topicReplicationBacklogChart.ID:           metricPulsarReplicationBacklog,
    }
}

func extractTopicName(top topic) string {
    // persistent://sample/ns1/demo-1 => p:demo-1
    if idx := strings.LastIndexByte(top.name, '/'); idx > 0 {
        return top.name[:1] + ":" + top.name[idx+1:]
    }
    return top.name
}