netdata/netdata

View on GitHub
src/database/engine/page.c

Summary

Maintainability
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

#include "page.h"

#include "libnetdata/libnetdata.h"

typedef enum __attribute__((packed)) {
    PAGE_OPTION_ALL_VALUES_EMPTY    = (1 << 0),
} PAGE_OPTIONS;

typedef enum __attribute__((packed)) {
    PGD_STATE_CREATED_FROM_COLLECTOR        = (1 << 0),
    PGD_STATE_CREATED_FROM_DISK             = (1 << 1),
    PGD_STATE_SCHEDULED_FOR_FLUSHING        = (1 << 2),
    PGD_STATE_FLUSHED_TO_DISK               = (1 << 3),
} PGD_STATES;

typedef struct {
    uint8_t *data;
    uint32_t size;
} page_raw_t;


typedef struct {
    size_t num_buffers;
    gorilla_writer_t *writer;
    int aral_index;
} page_gorilla_t;

struct pgd {
    // the page type
    uint8_t type;

   // options related to the page
    PAGE_OPTIONS options;

    PGD_STATES states;

    // the uses number of slots in the page
    uint32_t used;

    // the total number of slots available in the page
    uint32_t slots;

    union {
        page_raw_t raw;
        page_gorilla_t gorilla;
    };
};

// ----------------------------------------------------------------------------
// memory management

struct {
    ARAL *aral_pgd;
    ARAL *aral_data[RRD_STORAGE_TIERS];
    ARAL *aral_gorilla_buffer[4];
    ARAL *aral_gorilla_writer[4];
} pgd_alloc_globals = {};

static ARAL *pgd_aral_data_lookup(size_t size)
{
    for (size_t tier = 0; tier < storage_tiers; tier++)
        if (size == tier_page_size[tier])
            return pgd_alloc_globals.aral_data[tier];

    return NULL;
}

void pgd_init_arals(void)
{
    // pgd aral
    {
        char buf[20 + 1];
        snprintfz(buf, sizeof(buf) - 1, "pgd");

        // FIXME: add stats
        pgd_alloc_globals.aral_pgd = aral_create(
                buf,
                sizeof(struct pgd),
                64,
                512 * (sizeof(struct pgd)),
                pgc_aral_statistics(),
                NULL, NULL, false, false);
    }

    // tier page aral
    {
        for (size_t i = storage_tiers; i > 0 ;i--)
        {
            size_t tier = storage_tiers - i;

            char buf[20 + 1];
            snprintfz(buf, sizeof(buf) - 1, "tier%zu-pages", tier);

            pgd_alloc_globals.aral_data[tier] = aral_create(
                    buf,
                    tier_page_size[tier],
                    64,
                    512 * (tier_page_size[tier]),
                    pgc_aral_statistics(),
                    NULL, NULL, false, false);
        }
    }

    // gorilla buffers aral
    for (size_t i = 0; i != 4; i++) {
        char buf[20 + 1];
        snprintfz(buf, sizeof(buf) - 1, "gbuffer-%zu", i);

        // FIXME: add stats
        pgd_alloc_globals.aral_gorilla_buffer[i] = aral_create(
                buf,
            RRDENG_GORILLA_32BIT_BUFFER_SIZE,
                64,
                512 * RRDENG_GORILLA_32BIT_BUFFER_SIZE,
                pgc_aral_statistics(),
                NULL, NULL, false, false);
    }

    // gorilla writers aral
    for (size_t i = 0; i != 4; i++) {
        char buf[20 + 1];
        snprintfz(buf, sizeof(buf) - 1, "gwriter-%zu", i);

        // FIXME: add stats
        pgd_alloc_globals.aral_gorilla_writer[i] = aral_create(
                buf,
                sizeof(gorilla_writer_t),
                64,
                512 * sizeof(gorilla_writer_t),
                pgc_aral_statistics(),
                NULL, NULL, false, false);
    }
}

static void *pgd_data_aral_alloc(size_t size)
{
    ARAL *ar = pgd_aral_data_lookup(size);
    if (!ar)
        return mallocz(size);
    else
        return aral_mallocz(ar);
}

static void pgd_data_aral_free(void *page, size_t size)
{
    ARAL *ar = pgd_aral_data_lookup(size);
    if (!ar)
        freez(page);
    else
        aral_freez(ar, page);
}

// ----------------------------------------------------------------------------
// management api

PGD *pgd_create(uint8_t type, uint32_t slots)
{
    PGD *pg = aral_mallocz(pgd_alloc_globals.aral_pgd);
    pg->type = type;
    pg->used = 0;
    pg->slots = slots;
    pg->options = PAGE_OPTION_ALL_VALUES_EMPTY;
    pg->states = PGD_STATE_CREATED_FROM_COLLECTOR;

    switch (type) {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
            uint32_t size = slots * page_type_size[type];

            internal_fatal(!size || slots == 1,
                      "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type);

            pg->raw.size = size;
            pg->raw.data = pgd_data_aral_alloc(size);
            break;
        }
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            internal_fatal(slots == 1,
                      "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type);

            pg->slots = 8 * RRDENG_GORILLA_32BIT_BUFFER_SLOTS;

            // allocate new gorilla writer
            pg->gorilla.aral_index = gettid_cached() % 4;
            pg->gorilla.writer = aral_mallocz(pgd_alloc_globals.aral_gorilla_writer[pg->gorilla.aral_index]);

            // allocate new gorilla buffer
            gorilla_buffer_t *gbuf = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
            memset(gbuf, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE);
            global_statistics_gorilla_buffer_add_hot();

            *pg->gorilla.writer = gorilla_writer_init(gbuf, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
            pg->gorilla.num_buffers = 1;

            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type);
            aral_freez(pgd_alloc_globals.aral_pgd, pg);
            pg = PGD_EMPTY;
            break;
    }

    return pg;
}

PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
{
    if (!size)
        return PGD_EMPTY;

    if (size < page_type_size[type])
        return PGD_EMPTY;

    PGD *pg = aral_mallocz(pgd_alloc_globals.aral_pgd);

    pg->type = type;
    pg->states = PGD_STATE_CREATED_FROM_DISK;
    pg->options = ~PAGE_OPTION_ALL_VALUES_EMPTY;

    switch (type)
    {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1:
            pg->raw.size = size;
            pg->used = size / page_type_size[type];
            pg->slots = pg->used;

            pg->raw.data = pgd_data_aral_alloc(size);
            memcpy(pg->raw.data, base, size);
            break;
        case RRDENG_PAGE_TYPE_GORILLA_32BIT:
            internal_fatal(size == 0, "Asked to create page with 0 data!!!");
            internal_fatal(size % sizeof(uint32_t), "Unaligned gorilla buffer size");
            internal_fatal(size % RRDENG_GORILLA_32BIT_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes",
                RRDENG_GORILLA_32BIT_BUFFER_SIZE);

            pg->raw.data = mallocz(size);
            pg->raw.size = size;

            // TODO: rm this
            memset(pg->raw.data, 0, size);
            memcpy(pg->raw.data, base, size);

            uint32_t total_entries = gorilla_buffer_patch((void *) pg->raw.data);

            pg->used = total_entries;
            pg->slots = pg->used;
            break;
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type);
            aral_freez(pgd_alloc_globals.aral_pgd, pg);
            pg = PGD_EMPTY;
            break;
    }

    return pg;
}

void pgd_free(PGD *pg)
{
    if (!pg)
        return;

    if (pg == PGD_EMPTY)
        return;

    switch (pg->type)
    {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1:
            pgd_data_aral_free(pg->raw.data, pg->raw.size);
            break;
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            if (pg->states & PGD_STATE_CREATED_FROM_DISK)
            {
                internal_fatal(pg->raw.data == NULL, "Tried to free gorilla PGD loaded from disk with NULL data");
                freez(pg->raw.data);
                pg->raw.data = NULL;
            }
            else if ((pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) ||
                     (pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) ||
                     (pg->states & PGD_STATE_FLUSHED_TO_DISK))
            {
                internal_fatal(pg->gorilla.writer == NULL,
                               "PGD does not have an active gorilla writer");

                internal_fatal(pg->gorilla.num_buffers == 0,
                               "PGD does not have any gorilla buffers allocated");

                while (true) {
                    gorilla_buffer_t *gbuf = gorilla_writer_drop_head_buffer(pg->gorilla.writer);
                    if (!gbuf)
                        break;
                    aral_freez(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index], gbuf);
                    pg->gorilla.num_buffers -= 1;
                }

                internal_fatal(pg->gorilla.num_buffers != 0,
                               "Could not free all gorilla writer buffers");

                aral_freez(pgd_alloc_globals.aral_gorilla_writer[pg->gorilla.aral_index], pg->gorilla.writer);
                pg->gorilla.writer = NULL;
            } else {
                fatal("pgd_free() called on gorilla page with unsupported state");
                // TODO: should we support any other states?
                // if (!(pg->states & PGD_STATE_FLUSHED_TO_DISK))
                //     fatal("pgd_free() is not supported yet for pages flushed to disk");
            }

            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
            break;
    }

    aral_freez(pgd_alloc_globals.aral_pgd, pg);
}

// ----------------------------------------------------------------------------
// utility functions

uint32_t pgd_type(PGD *pg)
{
    return pg->type;
}

bool pgd_is_empty(PGD *pg)
{
    if (!pg)
        return true;

    if (pg == PGD_EMPTY)
        return true;

    if (pg->used == 0)
        return true;

    if (pg->options & PAGE_OPTION_ALL_VALUES_EMPTY)
        return true;

    return false;
}

uint32_t pgd_slots_used(PGD *pg)
{
    if (!pg)
        return 0;

    if (pg == PGD_EMPTY)
        return 0;

    return pg->used;
}

uint32_t pgd_memory_footprint(PGD *pg)
{
    if (!pg)
        return 0;

    if (pg == PGD_EMPTY)
        return 0;

    size_t footprint = 0;
    switch (pg->type) {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1:
            footprint = sizeof(PGD) + pg->raw.size;
            break;
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            if (pg->states & PGD_STATE_CREATED_FROM_DISK)
                footprint = sizeof(PGD) + pg->raw.size;
            else
                footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE);

            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
            break;
    }

    return footprint;
}

uint32_t pgd_disk_footprint(PGD *pg)
{
    if (!pgd_slots_used(pg))
        return 0;

    size_t size = 0;

    switch (pg->type) {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
            uint32_t used_size = pg->used * page_type_size[pg->type];
            internal_fatal(used_size > pg->raw.size, "Wrong disk footprint page size");
            size = used_size;

            break;
        }
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR ||
                pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING ||
                pg->states & PGD_STATE_FLUSHED_TO_DISK)
            {
                internal_fatal(!pg->gorilla.writer,
                               "pgd_disk_footprint() not implemented for NULL gorilla writers");

                internal_fatal(pg->gorilla.num_buffers == 0,
                               "Gorilla writer does not have any buffers");

                size = pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE;

                if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) {
                    global_statistics_tier0_disk_compressed_bytes(gorilla_writer_nbytes(pg->gorilla.writer));
                    global_statistics_tier0_disk_uncompressed_bytes(gorilla_writer_entries(pg->gorilla.writer) * sizeof(storage_number));
                }
            } else if (pg->states & PGD_STATE_CREATED_FROM_DISK) {
                size = pg->raw.size;
            } else {
                fatal("Asked disk footprint on unknown page state");
            }

            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
            break;
    }

    internal_fatal(pg->states & PGD_STATE_CREATED_FROM_DISK,
                   "Disk footprint asked for page created from disk.");
    pg->states = PGD_STATE_SCHEDULED_FOR_FLUSHING;
    return size;
}

void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size)
{
    internal_fatal(pgd_disk_footprint(pg) != dst_size, "Wrong disk footprint size requested (need %u, available %u)",
                   pgd_disk_footprint(pg), dst_size);

    switch (pg->type) {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1:
            memcpy(dst, pg->raw.data, dst_size);
            break;
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            if ((pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) == 0)
                fatal("Copying to extent is supported only for PGDs that are scheduled for flushing.");

            internal_fatal(!pg->gorilla.writer,
                           "pgd_copy_to_extent() not implemented for NULL gorilla writers");

            internal_fatal(pg->gorilla.num_buffers == 0,
                           "pgd_copy_to_extent() gorilla writer does not have any buffers");

            bool ok = gorilla_writer_serialize(pg->gorilla.writer, dst, dst_size);
            UNUSED(ok);
            internal_fatal(!ok,
                           "pgd_copy_to_extent() tried to serialize pg=%p, gw=%p (with dst_size=%u bytes, num_buffers=%zu)",
                           pg, pg->gorilla.writer, dst_size, pg->gorilla.num_buffers);
            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
            break;
    }

    pg->states = PGD_STATE_FLUSHED_TO_DISK;
}

// ----------------------------------------------------------------------------
// data collection

void pgd_append_point(PGD *pg,
                      usec_t point_in_time_ut __maybe_unused,
                      NETDATA_DOUBLE n,
                      NETDATA_DOUBLE min_value,
                      NETDATA_DOUBLE max_value,
                      uint16_t count,
                      uint16_t anomaly_count,
                      SN_FLAGS flags,
                      uint32_t expected_slot)
{
    if (unlikely(pg->used >= pg->slots))
        fatal("DBENGINE: attempted to write beyond page size (page type %u, slots %u, used %u)",
              pg->type, pg->slots, pg->used /* FIXME:, pg->size */);

    if (unlikely(pg->used != expected_slot))
        fatal("DBENGINE: page is not aligned to expected slot (used %u, expected %u)",
              pg->used, expected_slot);

    if (!(pg->states & PGD_STATE_CREATED_FROM_COLLECTOR))
        fatal("DBENGINE: collection on page not created from a collector");

    if (pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING)
        fatal("Data collection on page already scheduled for flushing");

    switch (pg->type) {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT: {
            storage_number *tier0_metric_data = (storage_number *)pg->raw.data;
            storage_number t = pack_storage_number(n, flags);
            tier0_metric_data[pg->used++] = t;

            if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && does_storage_number_exist(t))
                pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY;

            break;
        }
        case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
            storage_number_tier1_t *tier12_metric_data = (storage_number_tier1_t *)pg->raw.data;
            storage_number_tier1_t t;
            t.sum_value = (float) n;
            t.min_value = (float) min_value;
            t.max_value = (float) max_value;
            t.anomaly_count = anomaly_count;
            t.count = count;
            tier12_metric_data[pg->used++] = t;

            if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && fpclassify(n) != FP_NAN)
                pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY;

            break;
        }
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            pg->used++;
            storage_number t = pack_storage_number(n, flags);

            if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && does_storage_number_exist(t))
                pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY;

            bool ok = gorilla_writer_write(pg->gorilla.writer, t);
            if (!ok) {
                gorilla_buffer_t *new_buffer = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
                memset(new_buffer, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE);

                gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
                pg->gorilla.num_buffers += 1;
                global_statistics_gorilla_buffer_add_hot();

                ok = gorilla_writer_write(pg->gorilla.writer, t);
                internal_fatal(ok == false, "Failed to writer value in newly allocated gorilla buffer.");
            }
            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
            break;
    }
}

// ----------------------------------------------------------------------------
// querying with cursor

static void pgdc_seek(PGDC *pgdc, uint32_t position)
{
    PGD *pg = pgdc->pgd;

    switch (pg->type) {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT:
        case RRDENG_PAGE_TYPE_ARRAY_TIER1:
            pgdc->slots = pgdc->pgd->used;
            break;
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            if (pg->states & PGD_STATE_CREATED_FROM_DISK) {
                pgdc->slots = pgdc->pgd->slots;
                pgdc->gr = gorilla_reader_init((void *) pg->raw.data);
            } else {
                if (!(pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) &&
                    !(pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) &&
                    !(pg->states & PGD_STATE_FLUSHED_TO_DISK))
                    fatal("pgdc_seek() currently is not supported for pages created from disk.");

                if (!pg->gorilla.writer)
                    fatal("Seeking from a page without an active gorilla writer is not supported (yet).");

                pgdc->slots = gorilla_writer_entries(pg->gorilla.writer);
                pgdc->gr = gorilla_writer_get_reader(pg->gorilla.writer);
            }

            if (position > pgdc->slots)
                position = pgdc->slots;

            for (uint32_t i = 0; i != position; i++) {
                uint32_t value;

                bool ok = gorilla_reader_read(&pgdc->gr, &value);

                if (!ok) {
                    // this is fine, the reader will return empty points
                    break;
                }
            }

            break;
        }
        default:
            netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
            break;
    }
}

void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position)
{
    // pgd might be null and position equal to UINT32_MAX

    pgdc->pgd = pgd;
    pgdc->position = position;

    if (!pgd)
        return;

    if (pgd == PGD_EMPTY)
        return;

    if (position == UINT32_MAX)
        return;

    pgdc_seek(pgdc, position);
}

bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position __maybe_unused, STORAGE_POINT *sp)
{
    if (!pgdc->pgd || pgdc->pgd == PGD_EMPTY || pgdc->position >= pgdc->slots)
    {
        storage_point_empty(*sp, sp->start_time_s, sp->end_time_s);
        return false;
    }

    internal_fatal(pgdc->position != expected_position, "Wrong expected cursor position");

    switch (pgdc->pgd->type)
    {
        case RRDENG_PAGE_TYPE_ARRAY_32BIT: {
            storage_number *array = (storage_number *) pgdc->pgd->raw.data;
            storage_number n = array[pgdc->position++];

            sp->min = sp->max = sp->sum = unpack_storage_number(n);
            sp->flags = (SN_FLAGS)(n & SN_USER_FLAGS);
            sp->count = 1;
            sp->anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;

            return true;
        }
        case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
            storage_number_tier1_t *array = (storage_number_tier1_t *) pgdc->pgd->raw.data;
            storage_number_tier1_t n = array[pgdc->position++];

            sp->flags = n.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
            sp->count = n.count;
            sp->anomaly_count = n.anomaly_count;
            sp->min = n.min_value;
            sp->max = n.max_value;
            sp->sum = n.sum_value;

            return true;
        }
        case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
            pgdc->position++;

            uint32_t n = 666666666;
            bool ok = gorilla_reader_read(&pgdc->gr, &n);
            if (ok) {
                sp->min = sp->max = sp->sum = unpack_storage_number(n);
                sp->flags = (SN_FLAGS)(n & SN_USER_FLAGS);
                sp->count = 1;
                sp->anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
            } else {
                storage_point_empty(*sp, sp->start_time_s, sp->end_time_s);
            }

            return ok;
        }
        default: {
            static bool logged = false;
            if (!logged)
            {
                netdata_log_error("DBENGINE: unknown page type %"PRIu32" found. Cannot decode it. Ignoring its metrics.",
                                  pgd_type(pgdc->pgd));
                logged = true;
            }

            storage_point_empty(*sp, sp->start_time_s, sp->end_time_s);
            return false;
        }
    }
}