OpenC3/cosmos

View on GitHub
openc3/python/openc3/utilities/store_implementation.py

Summary

Maintainability
A
1 hr
Test Coverage
# Copyright 2024 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

import redis
from redis.exceptions import TimeoutError
from openc3.utilities.connection_pool import ConnectionPool
from contextlib import contextmanager
import threading
from openc3.environment import *

if OPENC3_REDIS_CLUSTER:
    openc3_redis_cluster = True
else:
    openc3_redis_cluster = False


class StoreConnectionPool(ConnectionPool):
    @contextmanager
    def pipelined(self):
        if openc3_redis_cluster:
            yield  # TODO: Update keys to support pipelining in cluster
        else:
            with self.get() as redis:
                pipeline = redis.pipeline(transaction=False)
                thread_id = threading.get_native_id()
                self.pipelines[thread_id] = pipeline
                try:
                    yield
                finally:
                    pipeline.execute()
                    self.pipelines[thread_id] = None

    @contextmanager
    def get(self):
        thread_id = threading.get_native_id()
        if thread_id not in self.pipelines:
            self.pipelines[thread_id] = None
        pipeline = self.pipelines[thread_id]
        if pipeline:
            yield pipeline
        else:
            item = None
            with self.lock:
                if not self.pool.empty():
                    item = self.pool.get(False)
                elif self.count < self.pool_size:
                    item = self.ctor()
                    self.count += 1
                else:
                    item = self.pool.get()
            try:
                yield item
            finally:
                self.pool.put(item)


class StoreMeta(type):
    def __getattribute__(cls, func):
        if func == "instance" or func == "instance_mutex" or func == "my_instance":
            return super().__getattribute__(func)

        def method(*args, **kw_args):
            return getattr(cls.instance(), func)(*args, **kw_args)

        return method


class Store(metaclass=StoreMeta):
    # Variable that holds the singleton instance
    my_instance = None

    # Mutex used to ensure that only one instance is created
    instance_mutex = threading.Lock()

    # Get the singleton instance
    @classmethod
    def instance(cls, pool_size=100):
        if cls.my_instance:
            return cls.my_instance

        with cls.instance_mutex:
            cls.my_instance = cls(pool_size)
            return cls.my_instance

    # Delegate all unknown methods to redis through the @redis_pool
    def __getattr__(self, func):
        with self.redis_pool.get() as redis:

            def method(*args, **kwargs):
                return getattr(redis, func)(*args, **kwargs)

            return method

    def __init__(self, pool_size=10):
        self.redis_host = OPENC3_REDIS_HOSTNAME
        self.redis_port = OPENC3_REDIS_PORT
        self.redis_pool = StoreConnectionPool(self.build_redis, pool_size)
        self.topic_offsets = {}
        self.pipelines = {}

    if not openc3_redis_cluster:

        def build_redis(self):
            # NOTE: We can't use decode_response because it tries to decode the binary
            # packet buffer which does not work. Thus strings come back as bytes like
            # b"target_name" and we decode them using b"target_name".decode()
            return redis.Redis(
                host=self.redis_host,
                port=self.redis_port,
                username=OPENC3_REDIS_USERNAME,
                password=OPENC3_REDIS_PASSWORD,
            )

    ###########################################################################
    # Stream APIs
    ###########################################################################

    def get_oldest_message(self, topic):
        with self.redis_pool.get() as redis:
            result = redis.xrange(topic, count=1)
            if result and len(result) > 0:
                return result[0]
            else:
                return None

    def get_newest_message(self, topic):
        with self.redis_pool.get() as redis:
            # Default in xrevrange is range end '+', start '-' which means get all
            # elements from higher ID to lower ID and since we're limiting to 1
            # we get the last element. See https://redis.io/commands/xrevrange.
            result = redis.xrevrange(topic, count=1)
            if result and len(result) > 0:
                first = list(result[0])
                first[0] = first[0].decode()
                return first
            else:
                return (None, None)

    def get_last_offset(self, topic):
        with self.redis_pool.get() as redis:
            result = redis.xrevrange(topic, count=1)
            if result and result[0] and result[0][0]:
                return result[0][0].decode()
            else:
                return "0-0"

    def update_topic_offsets(self, topics):
        offsets = []
        for topic in topics:
            # Normally we will just be grabbing the topic offset
            # this allows xread to get everything past this point
            thread_id = threading.get_native_id()
            if thread_id not in self.topic_offsets:
                self.topic_offsets[thread_id] = {}
            topic_offsets = self.topic_offsets[thread_id]
            last_id = topic_offsets.get(topic)
            if last_id:
                offsets.append(last_id)
            else:
                # If there is no topic offset this is the first call.
                # Get the last offset ID so we'll start getting everything from now on
                offsets.append(self.get_last_offset(topic))
                topic_offsets[topic] = offsets[-1]
        return offsets

    if not openc3_redis_cluster:

        def read_topics(self, topics, offsets=None, timeout_ms=1000, count=None):
            if len(topics) == 0:
                return {}
            thread_id = threading.get_native_id()
            if thread_id not in self.topic_offsets:
                self.topic_offsets[thread_id] = {}
            topic_offsets = self.topic_offsets[thread_id]
            try:
                with self.redis_pool.get() as redis:
                    if not offsets:
                        offsets = self.update_topic_offsets(topics)
                    streams = {}
                    index = 0
                    for topic in topics:
                        streams[topic] = offsets[index]
                        index += 1
                    result = redis.xread(streams, block=timeout_ms, count=count)
                    if result and len(result) > 0:
                        for topic, messages in result:
                            for msg_id, msg_hash in messages:
                                if isinstance(topic, bytes):
                                    topic = topic.decode()
                                if isinstance(msg_id, bytes):
                                    msg_id = msg_id.decode()
                                topic_offsets[topic] = msg_id
                                yield topic, msg_id, msg_hash, redis
                    return result
            except TimeoutError:
                # Should return an empty hash not array - xread returns a hash
                return {}

    # Add new entry to the redis stream.
    # > https://www.rubydoc.info/github/redis/redis-rb/Redis:xadd
    #
    # @example Without options
    #   store.write_topic('MANGO__TOPIC', {'message' => 'something'})
    # @example With options
    #   store.write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: 'true')
    #
    # @param topic [String] the stream / topic
    # @param msg_hash [Hash]   one or multiple field-value pairs
    #
    # @option opts [String]  :id          the entry id, default value is `*`, it means auto generation,
    #   if `nil` id is passed it will be changed to `*`
    # @option opts [Integer] :maxlen      max length of entries, default value is `nil`, it means will grow forever
    # @option opts [String] :approximate whether to add `~` modifier of maxlen or not, default value is 'true'
    #
    # @return [String] the entry id
    def write_topic(self, topic, msg_hash, id="*", maxlen=None, approximate=True):
        if not id:
            id = "*"
        with self.redis_pool.get() as redis:
            return redis.xadd(topic, msg_hash, id=id, maxlen=maxlen, approximate=approximate)

    # Trims older entries of the redis stream if needed.
    # > https://www.rubydoc.info/github/redis/redis-rb/Redis:xtrim
    #
    # @example Without options
    #   store.trim_topic('MANGO__TOPIC', 1000)
    # @example With options
    #   store.trim_topic('MANGO__TOPIC', 1000, approximate: 'true', limit: 0)
    #
    # @param topic  [String]  the stream key
    # @param minid  [Integer] Id to throw away data up to
    # @param approximate [Boolean] whether to add `~` modifier of maxlen or not
    # @param limit  [Boolean] number of items to return from the call
    #
    # @return [Integer] the number of entries actually deleted
    def trim_topic(self, topic, minid, approximate=True, limit=0):
        with self.redis_pool.get() as redis:
            return redis.xtrim(name=topic, minid=minid, approximate=approximate, limit=limit)


class EphemeralStore(Store):
    # Variable that holds the singleton instance
    my_instance = None

    def __init__(self, pool_size=10):
        super().__init__(pool_size)
        self.redis_host = OPENC3_REDIS_EPHEMERAL_HOSTNAME
        self.redis_port = OPENC3_REDIS_EPHEMERAL_PORT
        self.redis_pool = StoreConnectionPool(self.build_redis, pool_size)