christoph2/pyxcp

View on GitHub
pyxcp/recorder/rekorder.hpp

Summary

Maintainability
Test Coverage


#if !defined(__REKORDER_HPP)
#define __REKORDER_HPP

#if !defined(STANDALONE_REKORDER)
    #define STANDALONE_REKORDER     0
#endif /* STANDALONE_REKORDER */

#include <array>
#include <atomic>
#include <bitset>
#include <exception>
#include <functional>
#include <optional>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
#include <string>
#include <stdexcept>

#include <cerrno>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <cstdio>

#include <ctime>
#include <thread>
#include <vector>


#if defined(_WIN32)
    #include <io.h>
    #include <fcntl.h>

    #include <Windows.h>
#endif /* _WIN32 */


#include "lz4.h"
#include "mio.hpp"

#if STANDALONE_REKORDER == 0
    #include <pybind11/pybind11.h>
    #include <pybind11/numpy.h>
    #include <pybind11/stl.h>

    namespace py = pybind11;
    using namespace pybind11::literals;
#endif /* STANDALONE_REKORDER */

#if !defined(__BIGGEST_ALIGNMENT__)
    #define __BIGGEST_ALIGNMENT__   (8)
#endif


#define __ALIGNMENT_REQUIREMENT     __BIGGEST_ALIGNMENT__
#define __ALIGN                     alignas(__ALIGNMENT_REQUIREMENT)


constexpr auto kilobytes(std::uint32_t value) -> std::uint32_t
{
    return value * 1024;
}

constexpr auto megabytes(std::uint32_t value) -> std::uint32_t
{
    return kilobytes(value) * 1024;
}

constexpr uint16_t XCP_PAYLOAD_MAX = 0xFFFF;

/*
byte-order is, where applicable little ending (LSB first).
*/
#pragma pack(push)
#pragma pack(1)
struct FileHeaderType
{
    uint16_t hdr_size;
    uint16_t version;
    uint16_t options;
    uint32_t num_containers;
    uint32_t record_count;
    uint32_t size_compressed;
    uint32_t size_uncompressed;
};

using HeaderTuple = std::tuple<std::uint32_t, std::uint32_t, std::uint32_t, std::uint32_t, double>;

static_assert(sizeof(FileHeaderType) == 22);

struct ContainerHeaderType
{
    uint32_t record_count;
    uint32_t size_compressed;
    uint32_t size_uncompressed;
};

using blob_t = unsigned char;

#if STANDALONE_REKORDER == 1
    using payload_t = std::shared_ptr<blob_t[]>;
#else
    using payload_t = py::array_t<blob_t>;
#endif /* STANDALONE_REKORDER */


struct frame_header_t
{
    uint8_t category {0};
    uint16_t counter {0};
    double timestamp {0.0};
    uint16_t length {0};
};
#pragma pack(pop)

using FrameTuple = std::tuple<std::uint8_t, std::uint16_t, double, std::uint16_t, payload_t>;
using FrameVector = std::vector<FrameTuple>;
using FrameTupleWriter = std::tuple<std::uint8_t, std::uint16_t, double, std::uint16_t, char *>;

enum class FrameCategory : std::uint8_t {
    META,
    CMD,
    RES,
    ERR,
    EV,
    SERV,
    DAQ,
    STIM,
};

namespace detail
{
    const std::string FILE_EXTENSION(".xmraw");
    const std::string MAGIC{"ASAMINT::XCP_RAW"};
    constexpr auto MAGIC_SIZE = 16;
    constexpr auto VERSION = 0x0100;
    constexpr auto FILE_HEADER_SIZE = sizeof(FileHeaderType);
    constexpr auto CONTAINER_SIZE = sizeof(ContainerHeaderType);
}


constexpr auto file_header_size() -> std::uint32_t {
    return (detail::FILE_HEADER_SIZE + detail::MAGIC_SIZE);
}

using rounding_func_t = std::function<std::uint32_t(std::uint32_t)>;

inline rounding_func_t create_rounding_func(std::uint32_t multiple)  {
    return [multiple](std::uint32_t value) {
        return (value + (multiple - 1)) & ~(multiple -1 );
    };
}

const auto round_to_alignment = create_rounding_func(__ALIGNMENT_REQUIREMENT);


inline void _fcopy(char * dest, char const * src, std::uint32_t n) noexcept
{
    for (std::uint32_t i = 0; i < n; ++i) {
        dest[i] = src[i];
    }
}

#if STANDALONE_REKORDER == 1
    inline blob_t * get_payload_ptr(const payload_t& payload) noexcept {
        return payload.get();
    }

    inline payload_t create_payload(std::uint32_t size, blob_t const * data) noexcept {
        auto pl = std::make_shared<blob_t[]>(size);
        _fcopy(reinterpret_cast<char*>(pl.get()), reinterpret_cast<char const*>(data), size);
        return pl;
    }
#else
    inline payload_t create_payload(std::uint32_t size, blob_t const * data) {
        return py::array_t<blob_t>(size, data);
    }

    inline blob_t * get_payload_ptr(const payload_t& payload) noexcept {
        py::buffer_info buf = payload.request();

        return  static_cast<blob_t *>(buf.ptr);
    }
#endif /* STANDALONE_REKORDER */

inline void hexdump(blob_t const * buf, std::uint16_t sz) {
    for (std::uint16_t idx = 0; idx < sz; ++idx)
    {
        printf("%02X ", buf[idx]);
    }
    printf("\n\r");
}


template <typename T>
class TsQueue {
public:
    TsQueue() = default;

    TsQueue(const TsQueue& other) noexcept {
        std::scoped_lock lock(other.m_mtx);
        m_queue = other.m_queue;
    }

    void put(T value) noexcept {
        std::scoped_lock lock(m_mtx);
        m_queue.push(value);
        m_cond.notify_one();
    }

    std::shared_ptr<T> get() noexcept {
        std::unique_lock lock(m_mtx);
        m_cond.wait(lock, [this]{return !m_queue.empty();});
        std::shared_ptr<T> result(std::make_shared<T>(m_queue.front()));
        m_queue.pop();
        return result;
    }

    bool empty() const noexcept {
        std::scoped_lock lock(m_mtx);
        return m_queue.empty();
    }

private:
    mutable std::mutex m_mtx;
    std::queue<T> m_queue;
    std::condition_variable m_cond;
};


class Event {
public:

    Event(const Event& other) noexcept {
        std::scoped_lock lock(other.m_mtx);
        m_flag = other.m_flag;
    }

    ~Event() = default;
    Event() = default;

    void signal() noexcept {
        std::scoped_lock lock(m_mtx);
        m_flag = true;
        m_cond.notify_one();
    }

    void wait() noexcept {
        std::unique_lock lock(m_mtx);
        m_cond.wait(lock, [this]{return m_flag;});
        m_flag = false;
    }

    bool state() const noexcept {
        std::scoped_lock lock(m_mtx);
        return m_flag;
    }

private:
    mutable std::mutex m_mtx {};
    bool m_flag {false};
    std::condition_variable m_cond {};
};


/*
 *
 * Super simplicistic block memory manager.
 *
 */
template <typename T, int _IS, int _NB>
class BlockMemory {

public:

    using mem_block_t = std::array<T, _IS>;

    explicit BlockMemory() noexcept : m_memory{nullptr}, m_allocation_count{0} {
        m_memory = new T[_IS * _NB];
    }

    ~BlockMemory() noexcept {
        if (m_memory) {
            delete[] m_memory;
        }
    }
    BlockMemory(const BlockMemory&) = delete;

    T * acquire() noexcept {
        const std::scoped_lock lock(m_mtx);

        if (m_allocation_count >= _NB) {
            return nullptr;
        }
        T * ptr = reinterpret_cast<T *>(m_memory + (m_allocation_count * _IS));
        m_allocation_count++;
        return ptr;
    }

    void release() noexcept {
        const std::scoped_lock lock(m_mtx);
        if (m_allocation_count == 0) {
            return;
        }
        m_allocation_count--;
    }

private:

    T * m_memory;
    std::uint32_t m_allocation_count;
    std::mutex m_mtx;

};


/**
 */
class XcpLogFileWriter
{
public:
    explicit XcpLogFileWriter(const std::string& file_name, uint32_t prealloc = 10UL, uint32_t chunk_size = 1) noexcept
    {
        if (!file_name.ends_with(detail::FILE_EXTENSION)) {
            m_file_name = file_name + detail::FILE_EXTENSION;
        } else {
            m_file_name = file_name;
        }

#if defined(_WIN32)
        m_fd = CreateFileA(
            m_file_name.c_str(),
            GENERIC_READ | GENERIC_WRITE,
            0,
            (LPSECURITY_ATTRIBUTES)nullptr,
            CREATE_ALWAYS,
            FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS,
            nullptr
        );
#else
        m_fd = open(m_file_name.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0666);
#endif
        truncate(megabytes(prealloc));
        m_mmap = new mio::mmap_sink(m_fd);
        m_chunk_size = megabytes(chunk_size);
        m_intermediate_storage = new blob_t[m_chunk_size + megabytes(1)];
        m_offset = detail::FILE_HEADER_SIZE + detail::MAGIC_SIZE;

        start_thread();
    }

    ~XcpLogFileWriter() noexcept {
      finalize();
    #ifdef __APPLE__
        if (collector_thread.joinable()) {
            collector_thread.join();
        }
    #endif
    }

    void finalize() {
        if (!m_finalized) {
            m_finalized = true;
            stop_thread();
            if (m_container_record_count) {
                compress_frames();
            }
            write_header(detail::VERSION, 0x0000, m_num_containers, m_record_count, m_total_size_compressed, m_total_size_uncompressed);
            m_mmap->unmap();
            truncate(m_offset);
#if defined(_WIN32)
            CloseHandle(m_fd);
#else
            close(m_fd);
#endif
            delete m_mmap;
            delete[] m_intermediate_storage;
        }
    }

    void add_frame(uint8_t category, uint16_t counter, double timestamp, uint16_t length, char const * data) noexcept {
        auto payload= new char[length];
        //auto payload = mem.acquire();

        _fcopy(payload, data, length);
        my_queue.put(
            std::make_tuple(category, counter, timestamp, length, payload)
        );
    }

protected:
    void truncate(off_t size) const noexcept
    {
#if defined(_WIN32)
        if (SetFilePointer(m_fd, size, nullptr, FILE_BEGIN) == INVALID_SET_FILE_POINTER) {
            // TODO: Errorhandling.
        }
        if (SetEndOfFile(m_fd) == 0) {
            // TODO: Errorhandling.
        }
#else
        ftruncate(m_fd, size);
#endif
    }

    blob_t * ptr(std::uint32_t pos = 0) const noexcept
    {
        return (blob_t *)(m_mmap->data() + pos);
    }

    template<typename T>
    void store_im(T const * data, std::uint32_t length) noexcept {
        _fcopy(reinterpret_cast<char*>(m_intermediate_storage + m_intermediate_storage_offset), reinterpret_cast<char const*>(data), length);
        m_intermediate_storage_offset += length;
    }

    void compress_frames() {
        auto container = ContainerHeaderType{};
        //printf("Compressing %u frames... [%d]\n", m_container_record_count, m_intermediate_storage_offset);
        const int cp_size = ::LZ4_compress_default(
            reinterpret_cast<char const*>(m_intermediate_storage), reinterpret_cast<char *>(ptr(m_offset + detail::CONTAINER_SIZE)),
            m_intermediate_storage_offset, LZ4_COMPRESSBOUND(m_intermediate_storage_offset)
        );
        if (cp_size < 0) {
            throw std::runtime_error("LZ4 compression failed.");
        }
        //printf("comp: %d %d [%f]\n", m_intermediate_storage_offset,  cp_size, double(m_intermediate_storage_offset) / double(cp_size));
        container.record_count = m_container_record_count;
        container.size_compressed = cp_size;
        container.size_uncompressed = m_container_size_uncompressed;
        _fcopy(reinterpret_cast<char *>(ptr(m_offset)), reinterpret_cast<char const*>(&container), detail::CONTAINER_SIZE);
        m_offset += (detail::CONTAINER_SIZE + cp_size);
        m_total_size_uncompressed += m_container_size_uncompressed;
        m_total_size_compressed += cp_size;
        m_record_count += m_container_record_count;
        m_container_size_uncompressed = 0;
        m_container_size_compressed = 0;
        m_container_record_count = 0;
        m_intermediate_storage_offset = 0;
        m_num_containers += 1;
    }

    void write_bytes(std::uint32_t pos, std::uint32_t count, char const * buf) const noexcept
    {
        auto addr = reinterpret_cast<char *>(ptr(pos));

        _fcopy(addr, buf, count);
    }

    void write_header(uint16_t version, uint16_t options, uint32_t num_containers,
                      uint32_t record_count, uint32_t size_compressed, uint32_t size_uncompressed) noexcept {
        auto header = FileHeaderType{};
        write_bytes(0x00000000UL, detail::MAGIC_SIZE, detail::MAGIC.c_str());
        header.hdr_size = detail::FILE_HEADER_SIZE + detail::MAGIC_SIZE;
        header.version = version;
        header.options = options;
        header.num_containers = num_containers;
        header.record_count = record_count;
        header.size_compressed = size_compressed;
        header.size_uncompressed = size_uncompressed;
        write_bytes(0x00000000UL + detail::MAGIC_SIZE, detail::FILE_HEADER_SIZE, reinterpret_cast<char const*>(&header));
    }

    bool start_thread() noexcept {
        if (collector_thread.joinable()) {
            return false;
        }
        stop_collector_thread_flag = false;
            #ifdef __APPLE__
                collector_thread = std::thread([this]() {
            #else
                collector_thread = std::jthread([this]() {
            #endif
            while (!stop_collector_thread_flag) {
                auto item = my_queue.get();
                const auto content = item.get();
                if (stop_collector_thread_flag == true)
                {
                    break;
                }
                const auto [category, counter, timestamp, length, payload] = *content;
                const frame_header_t frame{ category, counter, timestamp, length };
                store_im(&frame, sizeof(frame));
                store_im(payload, length);
                delete[] payload;
                m_container_record_count += 1;
                m_container_size_uncompressed += (sizeof(frame) + length);
                if (m_container_size_uncompressed > m_chunk_size) {
                    compress_frames();
                }
            }
        });
        return true;
    }

    bool stop_thread() noexcept {
        if (!collector_thread.joinable()) {
            return false;
        }
        stop_collector_thread_flag = true;
        my_queue.put(FrameTupleWriter{}); // Put something into the queue, otherwise the thread will hang forever.
        collector_thread.join();
        return true;
    }

private:
    std::string m_file_name;
    std::uint32_t m_offset{0};
    std::uint32_t m_chunk_size{0};
    std::uint32_t m_num_containers{0};
    std::uint32_t m_record_count{0UL};
    std::uint32_t m_container_record_count{0UL};
    std::uint32_t m_total_size_uncompressed{0UL};
    std::uint32_t m_total_size_compressed{0UL};
    std::uint32_t m_container_size_uncompressed{0UL};
    std::uint32_t m_container_size_compressed{0UL};
    __ALIGN blob_t * m_intermediate_storage{nullptr};
    std::uint32_t m_intermediate_storage_offset{0};
    mio::file_handle_type m_fd{INVALID_HANDLE_VALUE};
    mio::mmap_sink * m_mmap{nullptr};
    bool m_finalized{false};
    #ifdef __APPLE__
    std::thread collector_thread{};
    #else
    std::jthread collector_thread{};
    #endif
    std::mutex mtx;
    TsQueue<FrameTupleWriter> my_queue;
    BlockMemory<char, XCP_PAYLOAD_MAX, 16> mem{};
    std::atomic_bool stop_collector_thread_flag{false};
};


/**
 */
class XcpLogFileReader
{
public:
    explicit XcpLogFileReader(const std::string& file_name)
    {
        if (!file_name.ends_with(detail::FILE_EXTENSION)) {
            m_file_name = file_name + detail::FILE_EXTENSION;
        } else {
            m_file_name = file_name;
        }

        m_mmap = new mio::mmap_source(m_file_name);
        blob_t magic[detail::MAGIC_SIZE + 1];

        read_bytes(0UL, detail::MAGIC_SIZE, magic);
        if (memcmp(detail::MAGIC.c_str(), magic, detail::MAGIC_SIZE) != 0) {
            throw std::runtime_error("Invalid file magic.");
        }
        m_offset = detail::MAGIC_SIZE;

        read_bytes(m_offset, detail::FILE_HEADER_SIZE, reinterpret_cast<blob_t*>(&m_header));
        //printf("Sizes: %u %u %.3f\n", m_header.size_uncompressed,
        //       m_header.size_compressed,
        //       float(m_header.size_uncompressed) / float(m_header.size_compressed));
        if (m_header.hdr_size != detail::FILE_HEADER_SIZE + detail::MAGIC_SIZE)
        {
            throw std::runtime_error("File header size does not match.");
        }
        if (detail::VERSION != m_header.version)
        {
            throw std::runtime_error("File version mismatch.");
        }

        if (m_header.num_containers < 1) {
            throw std::runtime_error("At least one container required.");
        }

        m_offset += detail::FILE_HEADER_SIZE;
    }

    [[nodiscard]]
    FileHeaderType get_header()  const noexcept  {
        return m_header;
    }

    [[nodiscard]]
    auto get_header_as_tuple() const noexcept -> HeaderTuple {
        auto hdr = get_header();

        return std::make_tuple(
            hdr.num_containers,
            hdr.record_count,
            hdr.size_uncompressed,
            hdr.size_compressed,
            (double)((std::uint64_t)(((double)hdr.size_uncompressed / (double)hdr.size_compressed * 100.0) + 0.5)) / 100.0
        );
    }

    void reset() noexcept {
        m_current_container = 0;
        m_offset = file_header_size();
    }

    std::optional<FrameVector> next_block() {
        auto container = ContainerHeaderType{};
        auto frame = frame_header_t{};
        std::uint32_t boffs = 0;
        auto result = FrameVector{};
        payload_t payload;

        if (m_current_container >= m_header.num_containers) {
            return std::nullopt;
        }
        read_bytes(m_offset, detail::CONTAINER_SIZE, reinterpret_cast<blob_t*>(&container));
        __ALIGN auto buffer = new blob_t[container.size_uncompressed];
        m_offset += detail::CONTAINER_SIZE;
        result.reserve(container.record_count);
        const int uc_size = ::LZ4_decompress_safe(reinterpret_cast<char const*>(ptr(m_offset)), reinterpret_cast<char *>(buffer), container.size_compressed, container.size_uncompressed);
        if (uc_size < 0) {
            throw std::runtime_error("LZ4 decompression failed.");
        }
        boffs = 0;
        for (std::uint32_t idx = 0; idx < container.record_count; ++idx) {
            _fcopy(reinterpret_cast<char *>(&frame), reinterpret_cast<char const*>(&(buffer[boffs])), sizeof(frame_header_t));
            boffs += sizeof(frame_header_t);
            result.emplace_back(frame.category, frame.counter, frame.timestamp, frame.length, create_payload(frame.length, &buffer[boffs]));
            boffs += frame.length;
        }
        m_offset += container.size_compressed;
        m_current_container += 1;
        delete[] buffer;

        return std::optional<FrameVector>{result};
    }

    ~XcpLogFileReader() noexcept
    {
        delete m_mmap;
    }

protected:
    [[nodiscard]]
    blob_t const *ptr(std::uint32_t pos = 0) const
    {
        return reinterpret_cast<blob_t const*>(m_mmap->data() + pos);
    }

    void read_bytes(std::uint32_t pos, std::uint32_t count, blob_t * buf) const
    {
        auto addr = reinterpret_cast<char const*>(ptr(pos));
        _fcopy(reinterpret_cast<char *>(buf), addr, count);
    }

private:
    std::string m_file_name;
    std::uint32_t m_offset{0};
    std::uint32_t m_current_container{0};
    mio::mmap_source * m_mmap{nullptr};
    FileHeaderType m_header;
};

#endif // __REKORDER_HPP