smok-serwis/tempsdb

View on GitHub
tempsdb/chunks/direct.pyx

Summary

Maintainability
Test Coverage
C
72%
import os
import typing as tp
import struct
import warnings

from ..series cimport TimeSeries
from .gzip cimport ReadWriteGzipFile
from .base cimport Chunk


STRUCT_Q = struct.Struct('<Q')
DEF HEADER_SIZE = 4
DEF TIMESTAMP_SIZE = 8
DEF FOOTER_SIZE = 4


cdef class DirectChunk(Chunk):
    """
    Alternative implementation that extends the file as-it-goes, without allocating an entire page
    in advance.

    This is also the only chunk type capable of supporting gzip.

    Note that if you system doesn't like mmap resizing a lot, try to use it with
    `use_descriptor_access=True`.

    Note that you can only use gzip if you set use_descriptor_access to True

    :param gzip_compression_level: gzip compression level to use. 0 is default and means
        gzip disabled. If given, a warning will be emitted as gzip support is still experimental.
    :raises ValueError: non-direct descriptor was requested and gzip was enabled
    """
    def __init__(self, TimeSeries parent, str path, int page_size,
                 use_descriptor_access: tp.Optional[bool] = None,
                 int gzip_compression_level = 0):
        if path.endswith('.gz'):
            warnings.warn('Please pass the path without .gz')
            path = path.replace('.gz', '')
        if path.endswith('.direct'):
            warnings.warn('Please pass the path without .direct')
            path = path.replace('.direct', '')
        if use_descriptor_access is None:
            use_descriptor_access = False
            if gzip_compression_level:
                warnings.warn('Gzip support is experimental')
                use_descriptor_access = True

        self.gzip = gzip_compression_level

        if gzip_compression_level:
            path = path + '.gz'
        else:
            path = path + '.direct'

        if gzip_compression_level:
            if not use_descriptor_access:
                raise ValueError('Use descriptor access must be enabled when using gzip')
        super().__init__(parent, path, page_size,
                         use_descriptor_access=use_descriptor_access | bool(gzip_compression_level))

    cpdef object open_file(self, str path):
        if self.gzip:
            return ReadWriteGzipFile(path, compresslevel=self.gzip)
        else:
            return super().open_file(path)

    cpdef int after_init(self) except -1:
        cdef ReadWriteGzipFile rw_gz
        if isinstance(self.file, ReadWriteGzipFile):
            rw_gz = self.file
            self.file_size = rw_gz.size
        else:
            self.file.seek(0, os.SEEK_END)
            self.file_size = self.file.tell()
        self.entries = (self.file_size - HEADER_SIZE) // (self.block_size + TIMESTAMP_SIZE)
        self.pointer = self.file_size
        d = (self.file_size - self.block_size) - (self.file_size-(self.block_size + TIMESTAMP_SIZE))
        cdef bytes b = self.mmap[self.file_size-(self.block_size + TIMESTAMP_SIZE):self.file_size-self.block_size]

        self.max_ts, = STRUCT_Q.unpack(b)
        return 0

    cpdef int switch_to_mmap_based_access(self) except -1:
        if self.gzip:
            raise RuntimeError('Cannot switch to mmap because its gzipped!')
        super().switch_to_mmap_based_access()
        return 0

    cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
        cdef bytes b
        if self.file_lock_object:
            self.file_lock_object.acquire()
        try:
            self.file_size += self.block_size + TIMESTAMP_SIZE
            if not isinstance(self.file, ReadWriteGzipFile):
                self.file.seek(self.pointer, 0)
            b = STRUCT_Q.pack(timestamp) + data
            self.file.write(b)
            self.mmap.resize(self.file_size)
            self.pointer += self.block_size + TIMESTAMP_SIZE
            self.entries += 1
        finally:
            if self.file_lock_object:
                self.file_lock_object.release()
        return 0