netdata/netdata

View on GitHub
src/collectors/profile.plugin/plugin_profile.cc

Summary

Maintainability
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

#ifdef __cplusplus
extern "C" {
#endif

#include "daemon/common.h"

#ifdef __cplusplus
}
#endif

#include <random>
#include <thread>
#include <vector>

#define CONFIG_SECTION_PROFILE "plugin:profile"

class Generator {
public:
    Generator(size_t N) : Offset(0) {
        std::random_device RandDev;
        std::mt19937 Gen(RandDev());
        std::uniform_int_distribution<int> D(-16, 16);

        V.reserve(N);
        for (size_t Idx = 0; Idx != N; Idx++)
            V.push_back(D(Gen));
    }

    double getRandValue() {
        return V[Offset++ % V.size()];
    }

private:
    size_t Offset;
    std::vector<double> V;
};

class Profiler {
public:
    Profiler(size_t ID, size_t NumCharts, size_t NumDimsPerChart, time_t SecondsToBackfill, int UpdateEvery) :
        ID(ID),
        NumCharts(NumCharts),
        NumDimsPerChart(NumDimsPerChart),
        SecondsToBackfill(SecondsToBackfill),
        UpdateEvery(UpdateEvery),
        Gen(1024 * 1024)
    {}

    void create() {
        char ChartId[1024];
        char DimId[1024];

        Charts.reserve(NumCharts);
        for (size_t I = 0; I != NumCharts; I++) {
            size_t CID = ID + Charts.size() + 1;

            snprintfz(ChartId, 1024 - 1, "chart_%zu", CID);

            RRDSET *RS = rrdset_create_localhost(
                "profile", // type
                ChartId, // id
                nullptr, // name,
                "profile_family", // family
                "profile_context", // context
                "profile_title", // title
                "profile_units", // units
                "profile_plugin", // plugin
                "profile_module", // module
                12345678 + CID, // priority
                UpdateEvery, // update_every
                RRDSET_TYPE_LINE // chart_type
            );
            if (I != 0)
                rrdset_flag_set(RS, RRDSET_FLAG_HIDDEN);
            Charts.push_back(RS);

            Dimensions.reserve(NumDimsPerChart);
            for (size_t J = 0; J != NumDimsPerChart; J++) {
                snprintfz(DimId, 1024 - 1, "dim_%zu", J);

                RRDDIM *RD = rrddim_add(
                    RS, // st
                    DimId, // id
                    nullptr, // name
                    1, // multiplier
                    1, // divisor
                    RRD_ALGORITHM_ABSOLUTE // algorithm
                );

                Dimensions.push_back(RD);
            }
        }
    }

    void update(const struct timeval &Now) {
        for (RRDSET *RS: Charts) {
            for (RRDDIM *RD : Dimensions) {
                rrddim_timed_set_by_pointer(RS, RD, Now, Gen.getRandValue());
            }

            rrdset_timed_done(RS, Now, RS->counter_done != 0);
        }
    }

    void run() {
        #define WORKER_JOB_CREATE_CHARTS 0
        #define WORKER_JOB_UPDATE_CHARTS 1
        #define WORKER_JOB_METRIC_DURATION_TO_BACKFILL 2
        #define WORKER_JOB_METRIC_POINTS_BACKFILLED 3

        worker_register("PROFILER");
        worker_register_job_name(WORKER_JOB_CREATE_CHARTS, "create charts");
        worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "update charts");
        worker_register_job_custom_metric(WORKER_JOB_METRIC_DURATION_TO_BACKFILL, "duration to backfill", "seconds", WORKER_METRIC_ABSOLUTE);
        worker_register_job_custom_metric(WORKER_JOB_METRIC_POINTS_BACKFILLED, "points backfilled", "points", WORKER_METRIC_ABSOLUTE);

        heartbeat_t HB;
        heartbeat_init(&HB);

        worker_is_busy(WORKER_JOB_CREATE_CHARTS);
        create();

        struct timeval CollectionTV;
        now_realtime_timeval(&CollectionTV);

        if (SecondsToBackfill) {
            CollectionTV.tv_sec -= SecondsToBackfill;
            CollectionTV.tv_sec -= (CollectionTV.tv_sec % UpdateEvery);

            CollectionTV.tv_usec = 0;
        }

        size_t BackfilledPoints = 0;
        struct timeval NowTV, PrevTV;
        now_realtime_timeval(&NowTV);
        PrevTV = NowTV;

        while (service_running(SERVICE_COLLECTORS)) {
            worker_is_busy(WORKER_JOB_UPDATE_CHARTS);

            update(CollectionTV);
            CollectionTV.tv_sec += UpdateEvery;

            now_realtime_timeval(&NowTV);

            ++BackfilledPoints;
            if (NowTV.tv_sec > PrevTV.tv_sec) {
                PrevTV = NowTV;
                worker_set_metric(WORKER_JOB_METRIC_POINTS_BACKFILLED, BackfilledPoints * NumCharts * NumDimsPerChart);
                BackfilledPoints = 0;
            }

            size_t RemainingSeconds = (CollectionTV.tv_sec >= NowTV.tv_sec) ? 0 : (NowTV.tv_sec - CollectionTV.tv_sec);
            worker_set_metric(WORKER_JOB_METRIC_DURATION_TO_BACKFILL, RemainingSeconds);

            if (CollectionTV.tv_sec >= NowTV.tv_sec) {
                worker_is_idle();
                heartbeat_next(&HB, UpdateEvery * USEC_PER_SEC);
            }
        }
    }

private:
    size_t ID;
    size_t NumCharts;
    size_t NumDimsPerChart;
    size_t SecondsToBackfill;
    int UpdateEvery;

    Generator Gen;
    std::vector<RRDSET *> Charts;
    std::vector<RRDDIM *> Dimensions;
};

static void *subprofile_main(void* Arg) {
    Profiler *P = reinterpret_cast<Profiler *>(Arg);
    P->run();
    return nullptr;
}

static void profile_main_cleanup(void *pptr) {
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)CLEANUP_FUNCTION_GET_PTR(pptr);
    if(!static_thread) return;

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    netdata_log_info("cleaning up...");

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

extern "C" void *profile_main(void *ptr) {
    CLEANUP_FUNCTION_REGISTER(profile_main_cleanup) cleanup_ptr = ptr;

    int UpdateEvery = (int) config_get_number(CONFIG_SECTION_PROFILE, "update every", 1);
    if (UpdateEvery < localhost->rrd_update_every)
        UpdateEvery = localhost->rrd_update_every;

    // pick low-default values, in case this plugin is ever enabled accidentaly.
    size_t NumThreads = config_get_number(CONFIG_SECTION_PROFILE, "number of threads", 2);
    size_t NumCharts = config_get_number(CONFIG_SECTION_PROFILE, "number of charts", 2);
    size_t NumDimsPerChart = config_get_number(CONFIG_SECTION_PROFILE, "number of dimensions per chart", 2);
    size_t SecondsToBackfill = config_get_number(CONFIG_SECTION_PROFILE, "seconds to backfill", 10 * 60);

    std::vector<Profiler> Profilers;

    for (size_t Idx = 0; Idx != NumThreads; Idx++) {
        Profiler P(1e8 + Idx * 1e6, NumCharts, NumDimsPerChart, SecondsToBackfill, UpdateEvery);
        Profilers.push_back(P);
    }

    std::vector<ND_THREAD *> Threads(NumThreads);

    for (size_t Idx = 0; Idx != NumThreads; Idx++) {
        char Tag[NETDATA_THREAD_TAG_MAX + 1];

        snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "PROFILER[%zu]", Idx);
        Threads[Idx] = nd_thread_create(Tag, NETDATA_THREAD_OPTION_JOINABLE,
                                        subprofile_main, static_cast<void *>(&Profilers[Idx]));
    }

    for (size_t Idx = 0; Idx != NumThreads; Idx++)
        nd_thread_join(Threads[Idx]);

    return NULL;
}