src/collectors/plugins.d/pluginsd_internals.h
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_PLUGINSD_INTERNALS_H
#define NETDATA_PLUGINSD_INTERNALS_H
#include "pluginsd_parser.h"
#include "pluginsd_functions.h"
#include "pluginsd_dyncfg.h"
#include "pluginsd_replication.h"
#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING)
#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD)
PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg);
ssize_t send_to_plugin(const char *txt, void *data);
static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) {
RRDHOST *host = parser->user.host;
if(unlikely(!host))
netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
return host;
}
static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) {
RRDSET *st = parser->user.st;
if(unlikely(!st))
netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
return st;
}
static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) {
return parser->user.st;
}
static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
if(parser->user.st && !parser->user.v2.locked_data_collection) {
spinlock_lock(&parser->user.st->data_collection_lock);
parser->user.v2.locked_data_collection = true;
}
}
static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
if(parser->user.st && parser->user.v2.locked_data_collection) {
spinlock_unlock(&parser->user.st->data_collection_lock);
parser->user.v2.locked_data_collection = false;
return true;
}
return false;
}
static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
if(stale)
netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
rrdhost_hostname(parser->user.st->rrdhost),
rrdset_id(parser->user.st),
keyword);
}
if(unlikely(parser->user.v2.ml_locked)) {
ml_chart_update_end(parser->user.st);
parser->user.v2.ml_locked = false;
if(stale)
netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
rrdhost_hostname(parser->user.st->rrdhost),
rrdset_id(parser->user.st),
keyword);
}
}
static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
pluginsd_unlock_previous_scope_chart(parser, keyword, true);
if(parser->user.cleanup_slots && parser->user.st)
rrdset_pluginsd_receive_unslot(parser->user.st);
parser->user.st = NULL;
parser->user.cleanup_slots = false;
}
static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
RRDSET *old_st = parser->user.st;
pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0;
pid_t my_collector_tid = gettid_cached();
if(unlikely(old_collector_tid)) {
if(old_collector_tid != my_collector_tid) {
nd_log_limit_static_global_var(erl, 1, 0);
nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
"PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
keyword ? keyword : "UNKNOWN",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
my_collector_tid, old_collector_tid);
return false;
}
old_st->pluginsd.collector_tid = 0;
}
st->pluginsd.collector_tid = my_collector_tid;
pluginsd_clear_scope_chart(parser, keyword);
st->pluginsd.pos = 0;
parser->user.st = st;
parser->user.cleanup_slots = false;
return true;
}
static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) {
size_t wanted_size = st->pluginsd.size;
if(slot >= 1) {
st->pluginsd.dims_with_slots = true;
wanted_size = slot;
}
else {
st->pluginsd.dims_with_slots = false;
wanted_size = dictionary_entries(st->rrddim_root_index);
}
if(wanted_size > st->pluginsd.size) {
st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim));
// initialize the empty slots
for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) {
st->pluginsd.prd_array[i].rda = NULL;
st->pluginsd.prd_array[i].rd = NULL;
st->pluginsd.prd_array[i].id = NULL;
}
st->pluginsd.size = wanted_size;
}
if(st->pluginsd.dims_with_slots) {
struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1];
if(prd->rd != rd) {
prd->rda = rrddim_find_and_acquire(st, string2str(rd->id));
prd->rd = rrddim_acquired_to_rrddim(prd->rda);
prd->id = string2str(prd->rd->id);
}
if(obsolete)
parser->user.cleanup_slots = true;
}
}
static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
if (unlikely(!st->pluginsd.size)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.",
rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
struct pluginsd_rrddim *prd;
RRDDIM *rd;
if(likely(st->pluginsd.dims_with_slots)) {
// caching with slots
if(unlikely(slot < 1 || slot > st->pluginsd.size)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.",
rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size);
return NULL;
}
prd = &st->pluginsd.prd_array[slot - 1];
rd = prd->rd;
if(likely(rd)) {
#ifdef NETDATA_INTERNAL_CHECKS
if(strcmp(prd->id, dimension) != 0) {
ssize_t t;
for(t = 0; t < st->pluginsd.size ;t++) {
if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0)
break;
}
if(t >= st->pluginsd.size)
t = -1;
internal_fatal(true,
"PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', "
"the right slot is %zd",
dimension, slot, prd->id, t);
}
#endif
return rd;
}
}
else {
// caching without slots
if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
st->pluginsd.pos = 0;
prd = &st->pluginsd.prd_array[st->pluginsd.pos++];
rd = prd->rd;
if(likely(rd)) {
const char *id = prd->id;
if(strcmp(id, dimension) == 0) {
// we found it cached
return rd;
}
else {
// the cached one is not good for us
rrddim_acquired_release(prd->rda);
prd->rda = NULL;
prd->rd = NULL;
prd->id = NULL;
}
}
}
// we need to find the dimension and set it to prd
RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
if (unlikely(!rda)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
return NULL;
}
prd->rda = rda;
prd->rd = rd = rrddim_acquired_to_rrddim(rda);
prd->id = string2str(rd->id);
return rd;
}
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
if (unlikely(!chart || !*chart)) {
netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
rrdhost_hostname(host), cmd);
return NULL;
}
RRDSET *st = rrdset_find(host, chart);
if (unlikely(!st))
netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
rrdhost_hostname(host), chart, cmd);
return st;
}
static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) {
ssize_t slot = -1;
char *id = get_word(words, num_words, 1);
if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] &&
id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') {
slot = (ssize_t) str2ull_encoded(&id[5]);
if(slot < 0) slot = 0; // to make the caller increment its idx of the words
}
return slot;
}
static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) {
// clean possible old cached data
rrdset_pluginsd_receive_unslot(st);
if(unlikely(slot < 1 || slot >= INT32_MAX))
return;
RRDHOST *host = st->rrdhost;
if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) {
spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size;
size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2;
if(new_slots < (size_t)slot)
new_slots = slot;
host->rrdpush.receive.pluginsd_chart_slots.array =
reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *));
for(size_t i = old_slots; i < new_slots ;i++)
host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL;
host->rrdpush.receive.pluginsd_chart_slots.size = new_slots;
spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
}
host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st;
st->pluginsd.last_slot = (int32_t)slot - 1;
parser->user.cleanup_slots = obsolete;
}
static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) {
if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size))
return pluginsd_find_chart(host, id, keyword);
RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1];
if(!st) {
st = pluginsd_find_chart(host, id, keyword);
if(st)
pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE));
}
else {
internal_fatal(string_strcmp(st->id, id) != 0,
"PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'",
slot - 1, id, string2str(st->id));
}
return st;
}
static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
SN_FLAGS flags = SN_FLAG_NONE;
char c;
while ((c = *flags_str++)) {
switch (c) {
case 'A':
flags |= SN_FLAG_NOT_ANOMALOUS;
break;
case 'R':
flags |= SN_FLAG_RESET;
break;
case 'E':
flags = SN_EMPTY_SLOT;
return flags;
default:
internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
break;
}
}
return flags;
}
#endif //NETDATA_PLUGINSD_INTERNALS_H