smok-serwis/tempsdb

View on GitHub
tempsdb/series.pyx

Summary

Maintainability
Test Coverage
B
81%
import resource
import os
import typing as tp
import shutil
import threading
import warnings
import logging

from .chunks.base cimport Chunk
from .chunks.normal cimport NormalChunk
from .chunks.direct cimport DirectChunk
from .chunks.maker cimport create_chunk
from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists
from .metadata cimport read_meta_at, write_meta_at


cdef:
    set metadata_file_names = {'metadata.txt', 'metadata.minijson'}
    object logger = logging.getLogger(__name__)


cdef class TimeSeries:
    """
    A single time series. This maps each timestamp (unsigned long long) to a block of data
    of length block_size.

    When you're done with this, please call
    :meth:`~tempsdb.series.TimeSeries.close`.

    If you forget to, the destructor will do that instead, and a warning will be emitted.

    :ivar last_entry_ts: timestamp of the last entry added or 0 if no entries yet (int)
    :ivar last_entry_synced: timestamp of the last synchronized entry (int)
    :ivar block_size: size of the writable block of data (int)
    :ivar path: path to the directory containing the series (str)
    :ivar descriptor_based_access: are all chunks using descriptor-based access? (bool)
    :ivar name: name of the series (str)
    :ivar metadata: extra data (tp.Optional[dict])
    """
    cpdef tuple get_current_value(self):
        """
        Return latest value of this series
                        
        :return: tuple of (timestamp, value)
        :rtype: tp.Tuple[int, bytes]
        :raises ValueError: series has no data
        """
        if self.last_chunk is None:
            raise ValueError('No data in series')
        cdef:
            Iterator it = self.iterate_range(self.last_chunk.max_ts, self.last_chunk.max_ts)
            tuple tpl = it.next_item()
        it.close()

        if tpl is None:
            raise ValueError('Series is empty!')

        return tpl

    cpdef int disable_mmap(self) except -1:
        """
        Switches to descriptor-based file access method for the entire series,
        and all chunks open inside.
        """
        self.descriptor_based_access = True
        cdef Chunk chunk
        with self.lock, self.open_lock:
            for chunk in self.open_chunks.values():
                chunk.switch_to_descriptor_based_access()
        return 0

    cpdef int set_metadata(self, dict new_meta) except -1:
        """
        Set a new value for the :attr:`~tempsdb.series.TimeSeries.metadata` property.
        
        This writes the disk.
        
        :param new_meta: new value of metadata property
        """
        self.metadata = new_meta
        self.sync_metadata()
        return 0

    cpdef int enable_mmap(self) except -1:
        """
        Switches to mmap-based file access method for the entire series,
        and all chunks open inside.
        
        This will try to enable mmap on every chunk, but if mmap fails due to recoverable
        errors, it will remain in descriptor-based mode.
        
        :raises Corruption: mmap failed due to an irrecoverable error
        """
        self.descriptor_based_access = False
        cdef Chunk chunk
        with self.lock, self.open_lock:
            for chunk in self.open_chunks.values():
                chunk.switch_to_mmap_based_access()
        return 0

    def __init__(self, str path, str name, bint use_descriptor_based_access = False):
        logger.info('Opening new time series at %s called %s', path, name)
        self.descriptor_based_access = use_descriptor_based_access
        self.mpm = None
        self.name = name
        self.lock = threading.RLock()
        self.open_lock = threading.RLock()
        self.refs_chunks = {}
        self.closed = False

        self.path = path

        if not os.path.isdir(self.path):
            raise DoesNotExist('Chosen time series does not exist')

        cdef:
            dict metadata = read_meta_at(self.path)
            str filename
            list files = os.listdir(self.path)
            unsigned long long last_chunk_name
            bint is_direct
            bint is_gzip
            bytes meta_d

        try:
            self.block_size = metadata['block_size']
            self.max_entries_per_chunk = metadata['max_entries_per_chunk']
            self.last_entry_synced = metadata['last_entry_synced']
            self.page_size = metadata['page_size']
            self.metadata = metadata.get('metadata')
            self.gzip_level = metadata.get('gzip_level', 0)
        except (OSError, ValueError, KeyError) as e:
            raise Corruption('Corrupted series: %s' % (e, )) from e

        self.open_chunks = {}       # tp.Dict[int, Chunk]
        self.chunks = []            # type: tp.List[tp.Tuple[int, bool, bool]] # sorted by ASC
                                    #: timestamp, is_direct, is_gzip

        if not len(files):
            raise Corruption('Empty directory!')
        elif len(files) == 1:
            # empty series
            self.last_chunk = None
            self.last_entry_ts = 0
        else:
            for filename in files:
                if filename in metadata_file_names:
                    continue
                is_gzip = filename.endswith('.gz')
                if is_gzip:
                    filename = filename.replace('.gz', '')
                is_direct = filename.endswith('.direct')
                if is_direct:
                    filename = filename.replace('.direct', '')
                is_direct |= is_gzip
                try:
                    self.chunks.append((int(filename), is_direct, is_gzip))
                except ValueError:
                    raise Corruption('Detected invalid file "%s"' % (filename, ))
            self.chunks.sort()

            try:
                last_chunk_name, is_direct, is_gzip = self.chunks[-1]
            except IndexError as e:
                raise Corruption('Corrupted series: %s' % (e, )) from e
            self.last_chunk = self.open_chunk(last_chunk_name, is_direct, is_gzip)
            self.last_entry_ts = self.last_chunk.max_ts

    cdef void decref_chunk(self, unsigned long long name):
        self.refs_chunks[name] -= 1

    cdef void incref_chunk(self, unsigned long long name):
        if name not in self.refs_chunks:
            self.refs_chunks[name] = 1
        else:
            self.refs_chunks[name] += 1

    cdef Chunk open_chunk(self, unsigned long long name, bint is_direct, bint is_gzip):
        """
        Opens a provided chunk.
        
        Acquires a reference to the chunk.
        
        :param name: name of the chunk
        :param is_direct: is this a direct chunk?
        :param is_gzip: is this a gzipped chunk?
        :return: chunk
        :raises DoesNotExist: chunk not found
        :raises InvalidState: resource closed
        :raises ValueError: chunk was gzipped but not direct
        """
        if self.closed:
            raise InvalidState('Series is closed')
        if name not in (v[0] for v in self.chunks):
            raise DoesNotExist('Invalid chunk')
        if is_gzip and not is_direct:
            raise ValueError('Chunk that is gzipped must be direct')
        cdef Chunk chunk
        with self.open_lock:
            if name not in self.open_chunks:
                if is_direct:
                    chunk = DirectChunk(self,
                                        os.path.join(self.path, str(name)),
                                        self.page_size,
                                        use_descriptor_access=True,
                                        gzip_compression_level=self.gzip_level if is_gzip else 0)
                else:
                    chunk = NormalChunk(self,
                                  os.path.join(self.path, str(name)),
                                  self.page_size,
                                  use_descriptor_access=self.descriptor_based_access)
                self.open_chunks[name] = chunk
            else:
                chunk = self.open_chunks[name]
            self.incref_chunk(name)
        return chunk

    cpdef int trim(self, unsigned long long timestamp) except -1:
        """
        Delete all entries earlier than timestamp.
        
        Note that this will drop entire chunks, so it may be possible that some entries will linger
        on. This will not delete currently opened chunks!
        
        :param timestamp: timestamp to delete entries earlier than
        """
        if len(self.chunks) == 1:
            return 0
        cdef:
            unsigned long long chunk_to_delete
            int refs
        try:
            with self.open_lock:
                while len(self.chunks) >= 2 and timestamp > self.chunks[1][0]:
                    chunk_to_delete = self.chunks[0][0]
                    if chunk_to_delete in self.open_chunks:
                        refs = self.refs_chunks.get(chunk_to_delete, 0)
                        if not refs:
                            self.open_chunks[chunk_to_delete].delete()
                        else:
                            # I would delete it, but it's open...
                            return 0
                    else:
                        os.unlink(os.path.join(self.path, str(chunk_to_delete)))
                    del self.chunks[0]
                else:
                    return 0
        except IndexError:
            return 0
        if len(self.chunks) > 1:
            try:
                with self.open_lock:
                    while len(self.chunks) >= 2 and timestamp > self.chunks[1]:
                        chunk_to_delete = self.chunks[0]
                        if chunk_to_delete in self.open_chunks:
                            refs = self.get_references_for(chunk_to_delete)
                            if not refs:
                                self.open_chunks[chunk_to_delete].delete()
                            else:
                                # I would delete it, but it's open...
                                break
                        else:
                            os.unlink(os.path.join(self.path, str(chunk_to_delete)))
                        del self.chunks[0]
            except IndexError:
                pass
        return 0

    cpdef int close(self) except -1:
        """
        Close the series.
        
        No further operations can be executed on it afterwards.
        """
        cdef:
            Chunk chunk
            list open_chunks
        if self.closed:
            return 0
        open_chunks = list(self.open_chunks.values())
        for chunk in open_chunks:
            chunk.close(True)
        if self.mpm is not None:
            self.mpm.cancel()
            self.mpm = None
        self.closed = True
        logger.info('Closed time series at %s called %s', self.path, self.name)
        return 0

    cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp):
        """
        Return the index of chunk that should have given timestamp
        
        :param timestamp: timestamp to check, larger than first timestamp,
            smaller or equal to current timestamp
        :return: name of the starting chunk
        """
        cdef:
            unsigned int lo = 0
            unsigned int hi = len(self.chunks)
            unsigned int mid
        while lo < hi:
            mid = (lo+hi)//2
            if self.chunks[mid][0] < timestamp:
                lo = mid+1
            else:
                hi = mid

        try:
            if self.chunks[lo][0] == timestamp:
                return lo
            else:
                return lo-1
        except IndexError:
            return len(self.chunks)-1

    cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop,
                                 bint direct_bytes=True):
        """
        Return an iterator through collected data with given timestamps.
        
        :param start: timestamp to start at
        :param stop: timestamp to stop at
        :param direct_bytes: for compatibility with VarlenSeries. Ignored.
        :return: an iterator with the data
        :raises ValueError: start larger than stop
        """
        if self.last_chunk is None:
           return Iterator(self, 0, 0, [])

        if start > stop:
            raise ValueError('start larger than stop')
        if start < self.chunks[0][0]:
            start = self.chunks[0][0]
        if stop > self.last_entry_ts:
            stop = self.last_entry_ts

        cdef:
            unsigned int ch_start = self.get_index_of_chunk_for(start)
            unsigned int ch_stop = self.get_index_of_chunk_for(stop)
            list chunks = []
            bint is_first
            bint is_last
            unsigned int chunk_index
            Chunk chunk

        for chunk_index in range(ch_start, ch_stop+1):
            ts, is_direct, is_gzip = self.chunks[chunk_index]
            chunks.append(self.open_chunk(ts, is_direct, is_gzip))
        return Iterator(self, start, stop, chunks)

    cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
        """
        Mark the series as synced up to particular timestamp.
        
        This will additionally sync the metadata.
        
        :param timestamp: timestamp of the last synced entry
        """
        self.last_entry_synced = timestamp
        self.sync_metadata()
        return 0

    cdef int sync_metadata(self) except -1:
        """
        Write the metadata to disk
        """
        return write_meta_at(self.path, self.get_metadata())

    cpdef int sync(self) except -1:
        """
        Synchronize the data kept in the memory with these kept on disk
        
        :raises InvalidState: the resource is closed
        """
        if self.closed:
            raise InvalidState('series is closed')

        self.sync_metadata()

        if self.last_chunk is not None:
            self.last_chunk.sync()

        return 0

    cdef dict get_metadata(self):
        cdef dict meta = {
                'block_size': self.block_size,
                'max_entries_per_chunk': self.max_entries_per_chunk,
                'last_entry_synced': self.last_entry_synced,
                'page_size': self.page_size
            }
        if self.metadata is not None:
            meta['metadata'] = self.metadata
        return meta

    cdef void register_memory_pressure_manager(self, object mpm):
        """
        Register a memory pressure manager.
        
        This registers :meth:`~tempsdb.series.TimeSeries.close_chunks` as remaining in severity
        to be called each 30 seconds.
        
        No op if already closed
        """
        if self.closed:
            return
        self.mpm = mpm.register_on_remaining_in_severity(1, 30)(self.close_chunks)

    cpdef int close_chunks(self) except -1:
        """
        Close all chunks opened by read requests that are not referred to anymore.
        
        No-op if closed.
        """
        if self.closed:
            return 0
        if self.last_chunk is None:
            return 0
        if len(self.chunks) == 1:
            return 0
        cdef:
            unsigned long long chunk_name
            list chunks = list(self.open_chunks.keys())
            unsigned long long last_chunk_name = self.last_chunk.name()

        with self.open_lock:
            for chunk_name in chunks:
                if chunk_name == last_chunk_name:
                    continue
                elif not self.get_references_for(chunk_name):
                    self.open_chunks[chunk_name].close()
                    try:
                        del self.refs_chunks[chunk_name]
                    except KeyError:
                        pass
        return 0

    cpdef int append_padded(self, unsigned long long timestamp, bytes data) except -1:
        """
        Same as :meth:`~tempsdb.series.TimeSeries.append` but will accept data shorter
        than block_size.
        
        It will be padded with zeros.

        :param timestamp: timestamp, must be larger than current last_entry_ts
        :param data: data to write
        :raises ValueError: Timestamp not larger than previous timestamp or invalid block size
        :raises InvalidState: the resource is closed
        """
        cdef int data_len = len(data)
        if data_len > self.block_size:
            raise ValueError('Data too long')
        data = data + b'\x00'*(self.block_size - data_len)
        self.append(timestamp, data)
        return 0

    cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
        """
        Append an entry.
        
        :param timestamp: timestamp, must be larger than current last_entry_ts
        :param data: data to write
        :raises ValueError: Timestamp not larger than previous timestamp or invalid block size
        :raises InvalidState: the resource is closed
        """
        if self.closed:
            raise InvalidState('series is closed')
        if len(data) != self.block_size:
            raise ValueError('Invalid block size, was %s should be %s' % (len(data), self.block_size))
        if timestamp <= self.last_entry_ts and self.last_entry_ts:
            raise ValueError('Timestamp not larger than previous timestamp')

        with self.lock, self.open_lock:
            # If this is indeed our first chunk, or we've exceeded the limit of entries per chunk
            if self.last_chunk is None or self.last_chunk.length() >= self.max_entries_per_chunk:
                # Create a next chunk
                if self.last_chunk is not None:
                    self.decref_chunk(self.last_chunk.name())
                self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
                                               timestamp, data, self.page_size,
                                               descriptor_based_access=self.descriptor_based_access,
                                               use_direct_mode=bool(self.gzip_level),
                                               gzip_compression_level=self.gzip_level)
                self.open_chunks[timestamp] = self.last_chunk
                self.incref_chunk(timestamp)
                self.chunks.append((timestamp, bool(self.gzip_level), bool(self.gzip_level)))
            else:
                self.last_chunk.append(timestamp, data)
            self.last_entry_ts = timestamp

        return 0

    cpdef int delete(self) except -1:
        """
        Erase this series from the disk. Series must be opened to do that.
        
        :raises InvalidState: series is not opened
        """
        if self.closed:
            raise InvalidState('series is closed')
        self.close()
        shutil.rmtree(self.path)
        return 0

    cpdef unsigned long open_chunks_mmap_size(self):
        """
        Calculate how much RAM does the mmaped space take
        
        :return: how much RAM, in bytes, do the opened chunks consume?
        """
        cdef:
            unsigned long ram = 0
            Chunk chunk
        for chunk in self.open_chunks.values():
            ram += chunk.get_mmap_size()
        return ram

    def __del__(self):
        if not self.closed:
            warnings.warn('You forgot to close TimeSeries. Please explicitly close it when you '
                          'are done.')
            self.close()


cpdef TimeSeries create_series(str path, str name, unsigned int block_size,
                               int max_entries_per_chunk, int page_size=0,
                               bint use_descriptor_based_access=False,
                               int gzip_level=0):
    if not page_size:
        page_size = resource.getpagesize()
    if os.path.exists(path):
        raise AlreadyExists('This series already exists!')
    os.mkdir(path)
    cdef dict meta = {
            'block_size': block_size,
            'max_entries_per_chunk': max_entries_per_chunk,
            'last_entry_synced': 0,
            'page_size': page_size
    }
    if gzip_level:
        meta['gzip_level'] = gzip_level
    write_meta_at(path, meta)
    return TimeSeries(path, name,
                      use_descriptor_based_access=use_descriptor_based_access)