src/libnetdata/gorilla/gorilla.cc
// SPDX-License-Identifier: GPL-3.0-or-later
#include "gorilla.h"
#include <cassert>
#include <climits>
#include <cstdio>
#include <cstring>
using std::size_t;
template <typename T>
static constexpr size_t bit_size() noexcept
{
static_assert((sizeof(T) * CHAR_BIT) == 32 || (sizeof(T) * CHAR_BIT) == 64,
"Word size should be 32 or 64 bits.");
return (sizeof(T) * CHAR_BIT);
}
static void bit_buffer_write(uint32_t *buf, size_t pos, uint32_t v, size_t nbits)
{
assert(nbits > 0 && nbits <= bit_size<uint32_t>());
const size_t index = pos / bit_size<uint32_t>();
const size_t offset = pos % bit_size<uint32_t>();
pos += nbits;
if (offset == 0) {
buf[index] = v;
} else {
const size_t remaining_bits = bit_size<uint32_t>() - offset;
// write the lower part of the value
const uint32_t low_bits_mask = ((uint32_t) 1 << remaining_bits) - 1;
const uint32_t lowest_bits_in_value = v & low_bits_mask;
buf[index] |= (lowest_bits_in_value << offset);
if (nbits > remaining_bits) {
// write the upper part of the value
const uint32_t high_bits_mask = ~low_bits_mask;
const uint32_t highest_bits_in_value = (v & high_bits_mask) >> (remaining_bits);
buf[index + 1] = highest_bits_in_value;
}
}
}
static void bit_buffer_read(const uint32_t *buf, size_t pos, uint32_t *v, size_t nbits)
{
assert(nbits > 0 && nbits <= bit_size<uint32_t>());
const size_t index = pos / bit_size<uint32_t>();
const size_t offset = pos % bit_size<uint32_t>();
pos += nbits;
if (offset == 0) {
*v = (nbits == bit_size<uint32_t>()) ?
buf[index] :
buf[index] & (((uint32_t) 1 << nbits) - 1);
} else {
const size_t remaining_bits = bit_size<uint32_t>() - offset;
// extract the lower part of the value
if (nbits < remaining_bits) {
*v = (buf[index] >> offset) & (((uint32_t) 1 << nbits) - 1);
} else {
*v = (buf[index] >> offset) & (((uint32_t) 1 << remaining_bits) - 1);
nbits -= remaining_bits;
*v |= (buf[index + 1] & (((uint32_t) 1 << nbits) - 1)) << remaining_bits;
}
}
}
gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n)
{
gorilla_writer_t gw = gorilla_writer_t {
.head_buffer = gbuf,
.last_buffer = NULL,
.prev_number = 0,
.prev_xor_lzc = 0,
.capacity = 0
};
gorilla_writer_add_buffer(&gw, gbuf, n);
return gw;
}
void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n)
{
gbuf->header.next = NULL;
gbuf->header.entries = 0;
gbuf->header.nbits = 0;
uint32_t capacity = (n * bit_size<uint32_t>()) - (sizeof(gorilla_header_t) * CHAR_BIT);
gw->prev_number = 0;
gw->prev_xor_lzc = 0;
gw->capacity = capacity;
if (gw->last_buffer)
gw->last_buffer->header.next = gbuf;
__atomic_store_n(&gw->last_buffer, gbuf, __ATOMIC_RELAXED);
}
uint32_t gorilla_writer_entries(const gorilla_writer_t *gw) {
uint32_t entries = 0;
const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST);
do {
const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST);
entries += __atomic_load_n(&curr_gbuf->header.entries, __ATOMIC_SEQ_CST);
curr_gbuf = next_gbuf;
} while (curr_gbuf);
return entries;
}
bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number)
{
gorilla_header_t *hdr = &gw->last_buffer->header;
uint32_t *data = gw->last_buffer->data;
// this is the first number we are writing
if (hdr->entries == 0) {
if (hdr->nbits + bit_size<uint32_t>() >= gw->capacity)
return false;
bit_buffer_write(data, hdr->nbits, number, bit_size<uint32_t>());
__atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>(), __ATOMIC_RELAXED);
__atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED);
gw->prev_number = number;
return true;
}
// write true/false based on whether we got the same number or not.
if (number == gw->prev_number) {
if (hdr->nbits + 1 >= gw->capacity)
return false;
bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(1), 1);
__atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED);
return true;
}
if (hdr->nbits + 1 >= gw->capacity)
return false;
bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(0), 1);
__atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED);
uint32_t xor_value = gw->prev_number ^ number;
uint32_t xor_lzc = (bit_size<uint32_t>() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value);
uint32_t is_xor_lzc_same = (xor_lzc == gw->prev_xor_lzc) ? 1 : 0;
if (hdr->nbits + 1 >= gw->capacity)
return false;
bit_buffer_write(data, hdr->nbits, is_xor_lzc_same, 1);
__atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED);
if (!is_xor_lzc_same) {
size_t bits_needed = (bit_size<uint32_t>() == 32) ? 5 : 6;
if ((hdr->nbits + bits_needed) >= gw->capacity)
return false;
bit_buffer_write(data, hdr->nbits, xor_lzc, bits_needed);
__atomic_fetch_add(&hdr->nbits, bits_needed, __ATOMIC_RELAXED);
}
// write the bits of the XOR'd value without the LZC prefix
if (hdr->nbits + (bit_size<uint32_t>() - xor_lzc) >= gw->capacity)
return false;
bit_buffer_write(data, hdr->nbits, xor_value, bit_size<uint32_t>() - xor_lzc);
__atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>() - xor_lzc, __ATOMIC_RELAXED);
__atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED);
gw->prev_number = number;
gw->prev_xor_lzc = xor_lzc;
return true;
}
gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw) {
if (!gw->head_buffer)
return NULL;
gorilla_buffer_t *curr_head = gw->head_buffer;
gorilla_buffer_t *next_head = gw->head_buffer->header.next;
__atomic_store_n(&gw->head_buffer, next_head, __ATOMIC_RELAXED);
return curr_head;
}
uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw)
{
uint32_t nbits = 0;
const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST);
do {
const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST);
nbits += __atomic_load_n(&curr_gbuf->header.nbits, __ATOMIC_SEQ_CST);
curr_gbuf = next_gbuf;
} while (curr_gbuf);
return (nbits + (CHAR_BIT - 1)) / CHAR_BIT;
}
bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size) {
const gorilla_buffer_t *curr_gbuf = gw->head_buffer;
do {
const gorilla_buffer_t *next_gbuf = curr_gbuf->header.next;
size_t bytes = RRDENG_GORILLA_32BIT_BUFFER_SIZE;
if (bytes > dst_size)
return false;
memcpy(dst, curr_gbuf, bytes);
dst += bytes;
dst_size -= bytes;
curr_gbuf = next_gbuf;
} while (curr_gbuf);
return true;
}
uint32_t gorilla_buffer_patch(gorilla_buffer_t *gbuf) {
gorilla_buffer_t *curr_gbuf = gbuf;
uint32_t n = curr_gbuf->header.entries;
while (curr_gbuf->header.next) {
uint32_t *buf = reinterpret_cast<uint32_t *>(gbuf);
gbuf = reinterpret_cast<gorilla_buffer_t *>(&buf[RRDENG_GORILLA_32BIT_BUFFER_SLOTS]);
assert(((uintptr_t) (gbuf) % sizeof(uintptr_t)) == 0 &&
"Gorilla buffer not aligned to uintptr_t");
curr_gbuf->header.next = gbuf;
curr_gbuf = curr_gbuf->header.next;
n += curr_gbuf->header.entries;
}
return n;
}
gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw)
{
const gorilla_buffer_t *buffer = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST);
uint32_t entries = __atomic_load_n(&buffer->header.entries, __ATOMIC_SEQ_CST);
uint32_t capacity = __atomic_load_n(&buffer->header.nbits, __ATOMIC_SEQ_CST);
return gorilla_reader_t {
.buffer = buffer,
.entries = entries,
.index = 0,
.capacity = capacity,
.position = 0,
.prev_number = 0,
.prev_xor_lzc = 0,
.prev_xor = 0,
};
}
gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *gbuf)
{
uint32_t entries = __atomic_load_n(&gbuf->header.entries, __ATOMIC_SEQ_CST);
uint32_t capacity = __atomic_load_n(&gbuf->header.nbits, __ATOMIC_SEQ_CST);
return gorilla_reader_t {
.buffer = gbuf,
.entries = entries,
.index = 0,
.capacity = capacity,
.position = 0,
.prev_number = 0,
.prev_xor_lzc = 0,
.prev_xor = 0,
};
}
bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number)
{
const uint32_t *data = gr->buffer->data;
if (gr->index + 1 > gr->entries) {
// We don't have any more entries to return. However, the writer
// might have updated the buffer's entries. We need to check once
// more in case more elements were added.
gr->entries = __atomic_load_n(&gr->buffer->header.entries, __ATOMIC_SEQ_CST);
gr->capacity = __atomic_load_n(&gr->buffer->header.nbits, __ATOMIC_SEQ_CST);
// if the reader's current buffer has not been updated, we need to
// check if it has a pointer to a next buffer.
if (gr->index + 1 > gr->entries) {
gorilla_buffer_t *next_buffer = __atomic_load_n(&gr->buffer->header.next, __ATOMIC_SEQ_CST);
if (!next_buffer) {
// fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n (No more buffers to read from)", gr->length, gr->buffer);
return false;
}
// fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n", gr->length, gr->buffer);
*gr = gorilla_reader_init(next_buffer);
return gorilla_reader_read(gr, number);
}
}
// read the first number
if (gr->index == 0) {
bit_buffer_read(data, gr->position, number, bit_size<uint32_t>());
gr->index++;
gr->position += bit_size<uint32_t>();
gr->prev_number = *number;
return true;
}
// process same-number bit
uint32_t is_same_number;
bit_buffer_read(data, gr->position, &is_same_number, 1);
gr->position++;
if (is_same_number) {
*number = gr->prev_number;
gr->index++;
return true;
}
// proceess same-xor-lzc bit
uint32_t xor_lzc = gr->prev_xor_lzc;
uint32_t same_xor_lzc;
bit_buffer_read(data, gr->position, &same_xor_lzc, 1);
gr->position++;
if (!same_xor_lzc) {
bit_buffer_read(data, gr->position, &xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6);
gr->position += (bit_size<uint32_t>() == 32) ? 5 : 6;
}
// process the non-lzc suffix
uint32_t xor_value = 0;
bit_buffer_read(data, gr->position, &xor_value, bit_size<uint32_t>() - xor_lzc);
gr->position += bit_size<uint32_t>() - xor_lzc;
*number = (gr->prev_number ^ xor_value);
gr->index++;
gr->prev_number = *number;
gr->prev_xor_lzc = xor_lzc;
gr->prev_xor = xor_value;
return true;
}
/*
* Internal code used for fuzzing the library
*/
#ifdef ENABLE_FUZZER
#include <vector>
template<typename Word>
static std::vector<Word> random_vector(const uint8_t *data, size_t size) {
std::vector<Word> V;
V.reserve(1024);
while (size >= sizeof(Word)) {
size -= sizeof(Word);
Word w;
memcpy(&w, &data[size], sizeof(Word));
V.push_back(w);
}
return V;
}
class Storage {
public:
gorilla_buffer_t *alloc_buffer(size_t words) {
uint32_t *new_buffer = new uint32_t[words]();
assert(((((uintptr_t) new_buffer) % 8u) == 0) && "Unaligned buffer...");
Buffers.push_back(new_buffer);
return reinterpret_cast<gorilla_buffer_t *>(new_buffer);
}
void free_buffers() {
for (uint32_t *buffer : Buffers) {
delete[] buffer;
}
}
private:
std::vector<uint32_t *> Buffers;
};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) {
if (Size < 4)
return 0;
std::vector<uint32_t> RandomData = random_vector<uint32_t>(Data, Size);
Storage S;
size_t words_per_buffer = 8;
/*
* write data
*/
gorilla_buffer_t *first_buffer = S.alloc_buffer(words_per_buffer);
gorilla_writer_t gw = gorilla_writer_init(first_buffer, words_per_buffer);
for (size_t i = 0; i != RandomData.size(); i++) {
bool ok = gorilla_writer_write(&gw, RandomData[i]);
if (ok)
continue;
// add new buffer
gorilla_buffer_t *buffer = S.alloc_buffer(words_per_buffer);
gorilla_writer_add_buffer(&gw, buffer, words_per_buffer);
ok = gorilla_writer_write(&gw, RandomData[i]);
assert(ok && "Could not write data to new buffer!!!");
}
/*
* read data
*/
gorilla_reader_t gr = gorilla_writer_get_reader(&gw);
for (size_t i = 0; i != RandomData.size(); i++) {
uint32_t number = 0;
bool ok = gorilla_reader_read(&gr, &number);
assert(ok && "Failed to read number from gorilla buffer");
assert((number == RandomData[i])
&& "Read wrong number from gorilla buffer");
}
S.free_buffers();
return 0;
}
#endif /* ENABLE_FUZZER */
#ifdef ENABLE_BENCHMARK
#include <benchmark/benchmark.h>
#include <random>
static size_t NumItems = 1024;
static void BM_EncodeU32Numbers(benchmark::State& state) {
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint32_t> dist(0x0, 0x0000FFFF);
std::vector<uint32_t> RandomData;
for (size_t idx = 0; idx != NumItems; idx++) {
RandomData.push_back(dist(mt));
}
std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0);
for (auto _ : state) {
gorilla_writer_t gw = gorilla_writer_init(
reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()),
EncodedData.size());
for (size_t i = 0; i != RandomData.size(); i++)
benchmark::DoNotOptimize(gorilla_writer_write(&gw, RandomData[i]));
benchmark::ClobberMemory();
}
state.SetItemsProcessed(NumItems * state.iterations());
state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t));
}
BENCHMARK(BM_EncodeU32Numbers)->ThreadRange(1, 16)->UseRealTime();
static void BM_DecodeU32Numbers(benchmark::State& state) {
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint32_t> dist(0x0, 0xFFFFFFFF);
std::vector<uint32_t> RandomData;
for (size_t idx = 0; idx != NumItems; idx++) {
RandomData.push_back(dist(mt));
}
std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0);
std::vector<uint32_t> DecodedData(10 * RandomData.capacity(), 0);
gorilla_writer_t gw = gorilla_writer_init(
reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()),
EncodedData.size());
for (size_t i = 0; i != RandomData.size(); i++)
gorilla_writer_write(&gw, RandomData[i]);
for (auto _ : state) {
gorilla_reader_t gr = gorilla_reader_init(reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()));
for (size_t i = 0; i != RandomData.size(); i++) {
uint32_t number = 0;
benchmark::DoNotOptimize(gorilla_reader_read(&gr, &number));
}
benchmark::ClobberMemory();
}
state.SetItemsProcessed(NumItems * state.iterations());
state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t));
}
BENCHMARK(BM_DecodeU32Numbers)->ThreadRange(1, 16)->UseRealTime();
#endif /* ENABLE_BENCHMARK */