src/database/sqlite/sqlite_aclk_alert.c
// SPDX-License-Identifier: GPL-3.0-or-later
#include "sqlite_functions.h"
#include "sqlite_aclk_alert.h"
#include "../../aclk/aclk_alarm_api.h"
#define SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param) \
({ \
int _param = (param); \
sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \
})
#define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \
"SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld " \
"WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \
"AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL"
static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uuid_t *host_id)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res))
return false;
bool ret = false;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC));
param = 0;
ret = (sqlite3_step_monitored(res) == SQLITE_ROW);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
return ret;
}
#define MAX_REMOVED_PERIOD 604800 //a week
#define SQL_UPDATE_ALERT_VERSION_TRANSITION \
"UPDATE alert_version SET unique_id = @unique_id WHERE health_log_id = @health_log_id"
static void update_alert_version_transition(int64_t health_log_id, int64_t unique_id)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION_TRANSITION, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
param = 0;
int rc = sqlite3_step_monitored(res);
if (rc != SQLITE_DONE)
error_report("Failed to update alert_version to latest transition");
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
}
//decide if some events should be sent or not
#define SQL_SELECT_LAST_ALERT_STATUS "SELECT status FROM alert_version WHERE health_log_id = @health_log_id "
static bool cloud_status_matches(int64_t health_log_id, RRDCALC_STATUS status)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_LAST_ALERT_STATUS, &res))
return true;
bool send = false;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, health_log_id));
param = 0;
int rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW)) {
RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
send = (current_status == status);
}
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
return send;
}
#define SQL_QUEUE_ALERT_TO_CLOUD \
"INSERT INTO aclk_queue (host_id, health_log_id, unique_id, date_created)" \
" VALUES (@host_id, @health_log_id, @unique_id, UNIXEPOCH())" \
" ON CONFLICT(host_id, health_log_id) DO UPDATE SET unique_id=excluded.unique_id, " \
" date_created=excluded.date_created"
//
// Attempt to insert an alert to the submit queue to reach the cloud
//
// The alert will NOT be added in the submit queue if
// - Cloud is already aware of the alert status
// - The transition refers to a variable
//
static int insert_alert_to_submit_queue(RRDHOST *host, int64_t health_log_id, uint32_t unique_id, RRDCALC_STATUS status)
{
static __thread sqlite3_stmt *res = NULL;
if (cloud_status_matches(health_log_id, status)) {
update_alert_version_transition(health_log_id, unique_id);
return 1;
}
if (is_event_from_alert_variable_config(unique_id, &host->host_id.uuid))
return 2;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_QUEUE_ALERT_TO_CLOUD, &res))
return -1;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (int64_t) unique_id));
param = 0;
int rc = execute_insert(res);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to insert alert in the submit queue %"PRIu32", rc = %d", unique_id, rc);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
return 0;
}
#define SQL_DELETE_QUEUE_ALERT_TO_CLOUD \
"DELETE FROM aclk_queue WHERE host_id = @host_id AND sequence_id BETWEEN @seq1 AND @seq2"
//
// Delete a range of alerts from the submit queue (after being sent to the the cloud)
//
static int delete_alert_from_submit_queue(RRDHOST *host, int64_t first_seq_id, int64_t last_seq_id)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_QUEUE_ALERT_TO_CLOUD, &res))
return -1;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, first_seq_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, last_seq_id));
param = 0;
int rc = sqlite3_step_monitored(res);
if (rc != SQLITE_DONE)
error_report("Failed to delete submitted to ACLK");
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
return 0;
}
int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
{
switch(status) {
case RRDCALC_STATUS_REMOVED:
return ALARM_STATUS_REMOVED;
case RRDCALC_STATUS_UNDEFINED:
return ALARM_STATUS_NOT_A_NUMBER;
case RRDCALC_STATUS_CLEAR:
return ALARM_STATUS_CLEAR;
case RRDCALC_STATUS_WARNING:
return ALARM_STATUS_WARNING;
case RRDCALC_STATUS_CRITICAL:
return ALARM_STATUS_CRITICAL;
default:
return ALARM_STATUS_UNKNOWN;
}
}
static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) {
char uuid_str[UUID_STR_LEN];
if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
uuid_str[0] = '\0';
else
uuid_unparse_lower(*((nd_uuid_t *) sqlite3_column_blob(res, iCol)), uuid_str);
return strdupz(uuid_str);
}
static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
char *ret;
if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
ret = "";
else
ret = (char *)sqlite3_column_text(res, iCol);
return strdupz(ret);
}
#define SQL_UPDATE_ALERT_VERSION \
"INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted)" \
" VALUES (@health_log_id, @unique_id, @status, @version, UNIXEPOCH())" \
" ON CONFLICT(health_log_id) DO UPDATE SET status = excluded.status, version = excluded.version, " \
" unique_id=excluded.unique_id, date_submitted=excluded.date_submitted"
//
// Store a new alert transition along with the version after sending to the cloud
// - Update an existing alert with the updated version, status, transition and date submitted
//
static void sql_update_alert_version(int64_t health_log_id, int64_t unique_id, RRDCALC_STATUS status, uint64_t version)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, status));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, version));
param = 0;
int rc = sqlite3_step_monitored(res);
if (rc != SQLITE_DONE)
error_report("Failed to execute sql_update_alert_version");
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
}
#define SQL_SELECT_ALERT_TO_DUMMY \
"SELECT aq.sequence_id, hld.unique_id, hld.when_key, hld.new_status, hld.health_log_id" \
" FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \
" WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \
" AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \
" ORDER BY aq.sequence_id ASC"
//
// Check all queued alerts for a host and commit them as if they have been send to the cloud
// this will produce new versions as needed. We need this because we are about to send a
// a snapshot so we can include the latest transition.
//
static void commit_alert_events(RRDHOST *host)
{
sqlite3_stmt *res = NULL;
if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_DUMMY, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
int64_t first_sequence_id = 0;
int64_t last_sequence_id = 0;
param = 0;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
last_sequence_id = sqlite3_column_int64(res, 0);
if (first_sequence_id == 0)
first_sequence_id = last_sequence_id;
int64_t unique_id = sqlite3_column_int(res, 1);
int64_t version = sqlite3_column_int64(res, 2);
RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 3);
int64_t health_log_id = sqlite3_column_int64(res, 4);
sql_update_alert_version(health_log_id, unique_id, status, version);
}
if (first_sequence_id)
delete_alert_from_submit_queue(host, first_sequence_id, last_sequence_id);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}
typedef enum {
SEQUENCE_ID,
UNIQUE_ID,
ALARM_ID,
CONFIG_HASH_ID,
UPDATED_BY_ID,
WHEN_KEY,
DURATION,
NON_CLEAR_DURATION,
FLAGS,
EXEC_RUN_TIMESTAMP,
DELAY_UP_TO_TIMESTAMP,
NAME,
CHART,
EXEC,
RECIPIENT,
SOURCE,
UNITS,
INFO,
EXEC_CODE,
NEW_STATUS,
OLD_STATUS,
DELAY,
NEW_VALUE,
OLD_VALUE,
LAST_REPEAT,
CHART_CONTEXT,
TRANSITION_ID,
ALARM_EVENT_ID,
CHART_NAME,
SUMMARY,
HEALTH_LOG_ID,
VERSION
} HealthLogDetails;
void health_alarm_log_populate(
struct alarm_log_entry *alarm_log,
sqlite3_stmt *res,
RRDHOST *host,
RRDCALC_STATUS *status)
{
char old_value_string[100 + 1];
char new_value_string[100 + 1];
RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, NEW_STATUS);
if (status)
*status = current_status;
char *source = (char *) sqlite3_column_text(res, SOURCE);
alarm_log->command = source ? health_edit_command_from_source(source) : strdupz("UNKNOWN=0=UNKNOWN");
alarm_log->chart = strdupz((char *) sqlite3_column_text(res, CHART));
alarm_log->name = strdupz((char *) sqlite3_column_text(res, NAME));
alarm_log->when = sqlite3_column_int64(res, WHEN_KEY);
alarm_log->config_hash = sqlite3_uuid_unparse_strdupz(res, CONFIG_HASH_ID);
alarm_log->utc_offset = host->utc_offset;
alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
alarm_log->exec_path = sqlite3_column_bytes(res, EXEC) ?
strdupz((char *)sqlite3_column_text(res, EXEC)) :
strdupz((char *)string2str(host->health.health_default_exec));
alarm_log->conf_source = source ? strdupz(source) : strdupz("");
time_t duration = sqlite3_column_int64(res, DURATION);
alarm_log->duration = (duration > 0) ? duration : 0;
alarm_log->non_clear_duration = sqlite3_column_int64(res, NON_CLEAR_DURATION);
alarm_log->status = rrdcalc_status_to_proto_enum(current_status);
alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)sqlite3_column_int64(res, OLD_STATUS));
alarm_log->delay = sqlite3_column_int64(res, DELAY);
alarm_log->delay_up_to_timestamp = sqlite3_column_int64(res, DELAY_UP_TO_TIMESTAMP);
alarm_log->last_repeat = sqlite3_column_int64(res, LAST_REPEAT);
uint64_t flags = sqlite3_column_int64(res, FLAGS);
char *recipient = (char *) sqlite3_column_text(res, RECIPIENT);
alarm_log->silenced =
((flags & HEALTH_ENTRY_FLAG_SILENCED) || (recipient && !strncmp(recipient, "silent", 6))) ? 1 : 0;
double value = sqlite3_column_double(res, NEW_VALUE);
double old_value = sqlite3_column_double(res, OLD_VALUE);
alarm_log->value_string =
sqlite3_column_type(res, NEW_VALUE) == SQLITE_NULL ?
strdupz((char *)"-") :
strdupz((char *)format_value_and_unit(
new_value_string, 100, value, (char *)sqlite3_column_text(res, UNITS), -1));
alarm_log->old_value_string =
sqlite3_column_type(res, OLD_VALUE) == SQLITE_NULL ?
strdupz((char *)"-") :
strdupz((char *)format_value_and_unit(
old_value_string, 100, old_value, (char *)sqlite3_column_text(res, UNITS), -1));
alarm_log->value = (!isnan(value)) ? (NETDATA_DOUBLE)value : 0;
alarm_log->old_value = (!isnan(old_value)) ? (NETDATA_DOUBLE)old_value : 0;
alarm_log->updated = (flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
alarm_log->rendered_info = sqlite3_text_strdupz_empty(res, INFO);
alarm_log->chart_context = sqlite3_text_strdupz_empty(res, CHART_CONTEXT);
alarm_log->chart_name = sqlite3_text_strdupz_empty(res, CHART_NAME);
alarm_log->transition_id = sqlite3_uuid_unparse_strdupz(res, TRANSITION_ID);
alarm_log->event_id = sqlite3_column_int64(res, ALARM_EVENT_ID);
alarm_log->version = sqlite3_column_int64(res, VERSION);
alarm_log->summary = sqlite3_text_strdupz_empty(res, SUMMARY);
alarm_log->health_log_id = sqlite3_column_int64(res, HEALTH_LOG_ID);
alarm_log->unique_id = sqlite3_column_int64(res, UNIQUE_ID);
alarm_log->alarm_id = sqlite3_column_int64(res, ALARM_ID);
alarm_log->sequence_id = sqlite3_column_int64(res, SEQUENCE_ID);
}
#define SQL_SELECT_ALERT_TO_PUSH \
"SELECT aq.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key," \
" hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name," \
" hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status," \
" hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id," \
" hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, hld.when_key" \
" FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \
" WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \
" AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \
" ORDER BY aq.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES
static void aclk_push_alert_event(RRDHOST *host __maybe_unused)
{
CLAIM_ID claim_id = claim_id_get();
if (!claim_id_is_set(claim_id) || UUIDiszero(host->node_id))
return;
sqlite3_stmt *res = NULL;
if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_PUSH, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
char node_id_str[UUID_STR_LEN];
uuid_unparse_lower(host->node_id.uuid, node_id_str);
struct alarm_log_entry alarm_log;
alarm_log.node_id = node_id_str;
alarm_log.claim_id = claim_id.str;
int64_t first_id = 0;
int64_t last_id = 0;
param = 0;
RRDCALC_STATUS status;
struct aclk_sync_cfg_t *wc = host->aclk_config;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
health_alarm_log_populate(&alarm_log, res, host, &status);
aclk_send_alarm_log_entry(&alarm_log);
wc->alert_count++;
last_id = alarm_log.sequence_id;
if (first_id == 0)
first_id = last_id;
sql_update_alert_version(alarm_log.health_log_id, alarm_log.unique_id, status, alarm_log.version);
destroy_alarm_log_entry(&alarm_log);
}
if (first_id) {
nd_log(
NDLS_ACCESS,
NDLP_DEBUG,
"ACLK RES [%s (%s)]: ALERTS SENT from %ld - %ld",
node_id_str,
rrdhost_hostname(host),
first_id,
last_id);
delete_alert_from_submit_queue(host, first_id, last_id);
// Mark to do one more check
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}
#define SQL_DELETE_PROCESSED_ROWS "DELETE FROM alert_queue WHERE host_id = @host_id AND rowid = @row"
static void delete_alert_from_pending_queue(RRDHOST *host, int64_t row)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_PROCESSED_ROWS, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row));
param = 0;
int rc = sqlite3_step_monitored(res);
if (rc != SQLITE_DONE)
error_report("Failed to delete processed rows, rc = %d", rc);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
}
#define SQL_REBUILD_HOST_ALERT_VERSION_TABLE \
"INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted) " \
" SELECT hl.health_log_id, hld.unique_id, hld.new_status, hld.when_key, UNIXEPOCH() " \
" FROM health_log hl, health_log_detail hld WHERE " \
" hl.host_id = @host_id AND hld.health_log_id = hl.health_log_id AND hld.transition_id = hl.last_transition_id"
#define SQL_DELETE_HOST_ALERT_VERSION_TABLE \
"DELETE FROM alert_version WHERE health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id)"
void rebuild_host_alert_version_table(RRDHOST *host)
{
sqlite3_stmt *res = NULL;
if (!PREPARE_STATEMENT(db_meta, SQL_DELETE_HOST_ALERT_VERSION_TABLE, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
param = 0;
int rc = execute_insert(res);
if (rc != SQLITE_DONE) {
netdata_log_error("Failed to delete the host alert version table");
goto done;
}
SQLITE_FINALIZE(res);
if (!PREPARE_STATEMENT(db_meta, SQL_REBUILD_HOST_ALERT_VERSION_TABLE, &res))
return;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
param = 0;
rc = execute_insert(res);
if (rc != SQLITE_DONE)
netdata_log_error("Failed to rebuild the host alert version table");
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}
#define SQL_PROCESS_ALERT_PENDING_QUEUE \
"SELECT health_log_id, unique_id, status, rowid" \
" FROM alert_queue WHERE host_id = @host_id AND date_scheduled <= UNIXEPOCH() ORDER BY rowid ASC"
bool process_alert_pending_queue(RRDHOST *host)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_PROCESS_ALERT_PENDING_QUEUE, &res))
return false;
int param = 0;
int added =0, count = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
param = 0;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
int64_t health_log_id = sqlite3_column_int64(res, 0);
uint32_t unique_id = sqlite3_column_int64(res, 1);
RRDCALC_STATUS new_status = sqlite3_column_int(res, 2);
int64_t row = sqlite3_column_int64(res, 3);
if (host->aclk_config) {
int ret = insert_alert_to_submit_queue(host, health_log_id, unique_id, new_status);
if (ret == 0)
added++;
}
delete_alert_from_pending_queue(host, row);
count++;
}
if(count)
nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Processed %d entries, queued %d", rrdhost_hostname(host), count, added);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
return added > 0;
}
void aclk_push_alert_events_for_all_hosts(void)
{
RRDHOST *host;
// Checking if we shutting down
if (!service_running(SERVICE_ACLK))
return;
dfe_start_reentrant(rrdhost_root_index, host) {
if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) ||
rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
continue;
rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
struct aclk_sync_cfg_t *wc = host->aclk_config;
if (!wc || false == wc->stream_alerts || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
(void)process_alert_pending_queue(host);
commit_alert_events(host);
continue;
}
if (wc->send_snapshot) {
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
if (wc->send_snapshot == 1)
continue;
(void)process_alert_pending_queue(host);
commit_alert_events(host);
rebuild_host_alert_version_table(host);
send_alert_snapshot_to_cloud(host);
wc->snapshot_count++;
wc->send_snapshot = 0;
}
else
aclk_push_alert_event(host);
}
dfe_done(host);
}
void aclk_send_alert_configuration(char *config_hash)
{
if (unlikely(!config_hash))
return;
struct aclk_sync_cfg_t *wc = localhost->aclk_config;
if (unlikely(!wc))
return;
nd_log(NDLS_ACCESS, NDLP_DEBUG,
"ACLK REQ [%s (%s)]: Request to send alert config %s.",
wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A",
config_hash);
aclk_push_alert_config(wc->node_id, config_hash);
}
#define SQL_SELECT_ALERT_CONFIG \
"SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
"module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
"options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
"p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id"
void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
{
sqlite3_stmt *res = NULL;
struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host || !(wc = host->aclk_config))) {
freez(config_hash);
freez(node_id);
return;
}
if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_CONFIG, &res))
return;
nd_uuid_t hash_uuid;
if (uuid_parse(config_hash, hash_uuid))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC));
struct aclk_alarm_configuration alarm_config;
struct provide_alarm_configuration p_alarm_config;
p_alarm_config.cfg_hash = NULL;
param = 0;
if (sqlite3_step_monitored(res) == SQLITE_ROW) {
alarm_config.alarm = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.tmpl = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.on_chart = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.classification = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.type = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.component = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.os = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.hosts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.plugin = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.module = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.charts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.lookup = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.every = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.units = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.green = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.red = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.calculation_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.warning_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.critical_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.recipient = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.exec = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.delay = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.repeat = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.info = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.options = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
alarm_config.host_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 25
alarm_config.p_db_lookup_dimensions = NULL;
alarm_config.p_db_lookup_method = NULL;
alarm_config.p_db_lookup_options = NULL;
alarm_config.p_db_lookup_after = 0;
alarm_config.p_db_lookup_before = 0;
if (sqlite3_column_bytes(res, 29) > 0) {
alarm_config.p_db_lookup_dimensions = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 26
alarm_config.p_db_lookup_method = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 27
if (param != 28)
netdata_log_error("aclk_push_alert_config_event: Unexpected param number %d", param);
BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
rrdr_options_to_buffer(tmp_buf, sqlite3_column_int(res, 28));
alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
buffer_free(tmp_buf);
alarm_config.p_db_lookup_after = sqlite3_column_int(res, 29);
alarm_config.p_db_lookup_before = sqlite3_column_int(res, 30);
}
alarm_config.p_update_every = sqlite3_column_int(res, 31);
alarm_config.chart_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 32);
alarm_config.summary = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 33);
p_alarm_config.cfg_hash = strdupz((char *) config_hash);
p_alarm_config.cfg = alarm_config;
}
param = 0;
if (likely(p_alarm_config.cfg_hash)) {
nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
freez(p_alarm_config.cfg_hash);
destroy_aclk_alarm_configuration(&alarm_config);
}
else
nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
freez(config_hash);
freez(node_id);
}
#define SQL_ALERT_VERSION_CALC \
"SELECT SUM(version) FROM health_log hl, alert_version av" \
" WHERE hl.host_id = @host_uuid AND hl.health_log_id = av.health_log_id AND av.status <> -2"
static uint64_t calculate_node_alert_version(RRDHOST *host)
{
static __thread sqlite3_stmt *res = NULL;
if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_ALERT_VERSION_CALC, &res))
return 0;
uint64_t version = 0;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
param = 0;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
version = (uint64_t)sqlite3_column_int64(res, 0);
}
done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
return version;
}
static void schedule_alert_snapshot_if_needed(struct aclk_sync_cfg_t *wc, uint64_t cloud_version)
{
uint64_t local_version = calculate_node_alert_version(wc->host);
if (local_version != cloud_version) {
nd_log(
NDLS_ACCESS,
NDLP_NOTICE,
"Scheduling alert snapshot for host \"%s\", node \"%s\" (version: cloud %zu, local %zu)",
rrdhost_hostname(wc->host),
wc->node_id,
cloud_version,
local_version);
wc->send_snapshot = 1;
rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
else
nd_log(
NDLS_ACCESS,
NDLP_DEBUG,
"Alert check on \"%s\", node \"%s\" (version: cloud %zu, local %zu)",
rrdhost_hostname(wc->host),
wc->node_id,
cloud_version,
local_version);
wc->checkpoint_count++;
}
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
{
nd_uuid_t node_uuid;
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host || !(wc = host->aclk_config))) {
nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
return;
}
nd_log(NDLS_ACCESS, NDLP_DEBUG,
"IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A",
snapshot_uuid);
if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
return;
wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
wc->send_snapshot = 1;
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
#define SQL_COUNT_SNAPSHOT_ENTRIES \
"SELECT COUNT(1) FROM alert_version av, health_log hl " \
"WHERE hl.host_id = @host_id AND hl.health_log_id = av.health_log_id AND av.status <> -2"
static int calculate_alert_snapshot_entries(nd_uuid_t *host_uuid)
{
int count = 0;
sqlite3_stmt *res = NULL;
if (!PREPARE_STATEMENT(db_meta, SQL_COUNT_SNAPSHOT_ENTRIES, &res))
return 0;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC));
param = 0;
int rc = sqlite3_step_monitored(res);
if (rc == SQLITE_ROW)
count = sqlite3_column_int(res, 0);
else
error_report("Failed to select snapshot count");
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
return count;
}
#define SQL_GET_SNAPSHOT_ENTRIES \
" SELECT 0, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " \
" hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " \
" hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status, " \
" hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " \
" hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, av.version " \
" FROM health_log hl, alert_hash ah, health_log_detail hld, alert_version av " \
" WHERE hl.config_hash_id = ah.hash_id" \
" AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " \
" AND hld.health_log_id = av.health_log_id AND av.unique_id = hld.unique_id AND av.status <> -2"
#define ALARM_EVENTS_PER_CHUNK 1000
void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused)
{
struct aclk_sync_cfg_t *wc = host->aclk_config;
if (unlikely(!host)) {
nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", wc->node_id);
return;
}
CLAIM_ID claim_id = claim_id_get();
if (unlikely(!claim_id_is_set(claim_id)))
return;
// Check the database for this node to see how many alerts we will need to put in the snapshot
int cnt = calculate_alert_snapshot_entries(&host->host_id.uuid);
if (!cnt)
return;
sqlite3_stmt *res = NULL;
if (!PREPARE_STATEMENT(db_meta, SQL_GET_SNAPSHOT_ENTRIES, &res))
return;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));
nd_uuid_t local_snapshot_uuid;
char snapshot_uuid_str[UUID_STR_LEN];
uuid_generate_random(local_snapshot_uuid);
uuid_unparse_lower(local_snapshot_uuid, snapshot_uuid_str);
char *snapshot_uuid = &snapshot_uuid_str[0];
nd_log(NDLS_ACCESS, NDLP_DEBUG,
"ACLK REQ [%s (%s)]: Sending %d alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(host),
cnt, snapshot_uuid);
uint32_t chunks;
chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
struct alarm_snapshot alarm_snap;
struct alarm_log_entry alarm_log;
alarm_snap.node_id = wc->node_id;
alarm_snap.claim_id = claim_id.str;
alarm_snap.snapshot_uuid = snapshot_uuid;
alarm_snap.chunks = chunks;
alarm_snap.chunk = 1;
alarm_log.node_id = wc->node_id;
alarm_log.claim_id = claim_id.str;
cnt = 0;
param = 0;
uint64_t version = 0;
int total_count = 0;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
cnt++;
total_count++;
if (!snapshot_proto)
snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
health_alarm_log_populate(&alarm_log, res, host, NULL);
add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
version += alarm_log.version;
if (cnt == ALARM_EVENTS_PER_CHUNK) {
if (aclk_online_for_alerts())
aclk_send_alarm_snapshot(snapshot_proto);
cnt = 0;
if (alarm_snap.chunk < chunks) {
alarm_snap.chunk++;
snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
}
}
destroy_alarm_log_entry(&alarm_log);
}
if (cnt)
aclk_send_alarm_snapshot(snapshot_proto);
nd_log(
NDLS_ACCESS,
NDLP_DEBUG,
"ACLK REQ [%s (%s)]: Sent! %d alerts snapshot, snapshot_uuid %s (version = %zu)",
wc->node_id,
rrdhost_hostname(host),
cnt,
snapshot_uuid,
version);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}
// Start streaming alerts
void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version)
{
nd_uuid_t node_uuid;
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host || !(wc = host->aclk_config))) {
nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, invalid node.", node_id);
return;
}
if (unlikely(!host->health.health_enabled)) {
nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
return;
}
nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
schedule_alert_snapshot_if_needed(wc, cloud_version);
wc->stream_alerts = true;
}
// Do checkpoint alert version check
void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version)
{
nd_uuid_t node_uuid;
if (unlikely(!node_id || !claim_id || !is_agent_claimed() || uuid_parse(node_id, node_uuid)))
return;
CLAIM_ID agent_claim_id = claim_id_get();
if (claim_id && claim_id_is_set(agent_claim_id) && strcmp(agent_claim_id.str, claim_id) != 0) {
nd_log(NDLS_ACCESS, NDLP_NOTICE,
"ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED WITH INVALID CLAIM ID",
node_id);
return;
}
struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
if ((!host || !(wc = host->aclk_config)))
nd_log(NDLS_ACCESS, NDLP_NOTICE,
"ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED FOR INVALID NODE",
node_id);
else
schedule_alert_snapshot_if_needed(wc, cloud_version);
}