OpenC3/cosmos

View on GitHub
openc3/python/openc3/packets/structure.py

Summary

Maintainability
A
1 hr
Test Coverage
# Copyright 2024 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

import copy
import threading
from contextlib import contextmanager
from openc3.accessors.binary_accessor import BinaryAccessor
from openc3.packets.structure_item import StructureItem
from openc3.utilities.string import formatted


class Structure:
    """Maintains knowledge of a raw binary structure. Uses structure_item to
    create individual structure items which are read and written by
    binary_accessor."""

    # String providing a single 0 byte
    ZERO_STRING = b"\00"

    def __init__(
        self,
        default_endianness=BinaryAccessor.HOST_ENDIANNESS,
        buffer=None,
        item_class=StructureItem,
    ):
        if (default_endianness == "BIG_ENDIAN") or (default_endianness == "LITTLE_ENDIAN"):
            self.default_endianness = default_endianness
            if buffer is not None and not isinstance(buffer, (bytes, bytearray)):
                raise TypeError(f"wrong argument type {buffer.__class__.__name__} (expected bytes)")
            if buffer is None:
                self._buffer = None
            else:
                self._buffer = bytearray(buffer)  # TODO: Do we need to force encoding?
            self.item_class = item_class
            self.items = {}
            self.sorted_items = []
            self.defined_length = 0
            self.defined_length_bits = 0
            self.pos_bit_size = 0
            self.neg_bit_size = 0
            self.fixed_size = True
            self.short_buffer_allowed = False
            self.mutex = None
            self.accessor = BinaryAccessor(self)
        else:
            raise AttributeError(f"Unknown endianness '{default_endianness}', must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'")

    # Read an item in the structure
    #
    # self.param item [StructureItem] Instance of StructureItem or one of its subclasses
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to read the item from
    # self.return Value based on the item definition. This could be a string, integer,
    #   float, or array of values.
    def read_item(self, item, value_type="RAW", buffer=None):
        if not buffer:
            buffer = self._buffer
        if not buffer:
            buffer = self.allocate_buffer_if_needed()
        return self.accessor.read_item(item, buffer)

    # Get the length of the buffer used by the structure
    #
    # self.return [Integer] Size of the buffer in bytes
    def length(self):
        self.allocate_buffer_if_needed()
        return len(self._buffer)

    # Resize the buffer at least the defined length of the structure
    def resize_buffer(self):
        if self._buffer:
            # Extend data size
            if len(self._buffer) < self.defined_length:
                self._buffer += Structure.ZERO_STRING * (self.defined_length - len(self._buffer))
        else:
            self.allocate_buffer_if_needed()

    @property
    def accessor(self):
        return self.__accessor

    # Configure the accessor for this packet
    #
    # self.param accessor [Accessor] The class to use as an accessor
    @accessor.setter
    def accessor(self, accessor):
        self.__accessor = accessor
        # isinstance can fail if the class is reloaded because the class becomes a new class
        # so directly check the class name which is basically equivalent
        if self.__accessor.enforce_short_buffer_allowed():
            self.short_buffer_allowed = True

    # Read a list of items in the structure
    #
    # self.param items [StructureItem] Array of StructureItem or one of its subclasses
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to read the item from
    # self.return Hash of read names and values
    def read_items(self, items, value_type="RAW", buffer=None):
        if not buffer:
            buffer = self._buffer
        if not buffer:
            buffer = self.allocate_buffer_if_needed()
        return self.accessor.read_items(items, buffer)

    # Allocate a buffer if not available
    def allocate_buffer_if_needed(self):
        if not self._buffer:
            self._buffer = bytearray(Structure.ZERO_STRING * self.defined_length)
        return self._buffer

    # Indicates if any items have been defined for this structure
    # self.return [TrueClass or FalseClass]
    def defined(self):
        return len(self.sorted_items) > 0

    # Rename an existing item
    #
    # self.param item_name [String] Name of the currently defined item
    # self.param new_item_name [String] New name for the item
    def rename_item(self, item_name, new_item_name):
        item = self.get_item(item_name)
        item.name = new_item_name
        self.items.pop(item_name)
        self.items[new_item_name] = item
        # Since self.sorted_items contains the actual item reference it is
        # updated when we set the item.name
        return item

    # Define an item in the structure. This creates a new instance of the
    # item_class as given in the constructor and adds it to the items hash. It
    # also resizes the buffer to accommodate the new item.
    #
    # self.param name [String] Name of the item. Used by the items hash to retrieve
    #   the item.
    # self.param bit_offset [Integer] Bit offset of the item in the raw buffer
    # self.param bit_size [Integer] Bit size of the item in the raw buffer
    # self.param data_type [Symbol] Type of data contained by the item. This is
    #   dependent on the item_class but by default see StructureItem.
    # self.param array_size [Integer] Set to a non None value if the item is to
    #   represented as an array.
    # self.param endianness [Symbol] Endianness of this item. By default the
    #   endianness as set in the constructor is used.
    # self.param overflow [Symbol] How to handle value overflows. This is
    #   dependent on the item_class but by default see StructureItem.
    # self.return [StrutureItem] The structure item defined
    def define_item(
        self,
        name,
        bit_offset,
        bit_size,
        data_type,
        array_size=None,
        endianness=None,
        overflow="ERROR",
    ):
        if not endianness:
            endianness = self.default_endianness
        # Create the item
        item = self.item_class(name, bit_offset, bit_size, data_type, endianness, array_size, overflow)
        return self.define(item)

    # Adds the given item to the items hash. It also resizes the buffer to
    # accommodate the new item.
    #
    # self.param item [StructureItem] The structure item to add
    # self.return [StrutureItem] The structure item defined
    def define(self, item):
        # Handle Overwriting Existing Item
        if self.items.get(item.name):
            item_index = None
            for index, sorted_item in enumerate(self.sorted_items):
                if sorted_item.name == item.name:
                    item_index = index
                    break
            if item_index < len(self.sorted_items):
                self.sorted_items.pop(item_index)

        # Add to Sorted Items
        if len(self.sorted_items) != 0:
            last_item = self.sorted_items[-1]
            self.sorted_items.append(item)
            # If the current item or last item have a negative offset then we have
            # to re-sort. We also re-sort if the current item is less than the last
            # item because we are inserting.
            if last_item.bit_offset <= 0 or item.bit_offset <= 0 or item.bit_offset < last_item.bit_offset:
                self.sorted_items.sort()
        else:
            self.sorted_items.append(item)

        # Add to the overall hash of defined items
        self.items[item.name] = item
        # Update fixed size knowledge
        if (item.data_type != "DERIVED" and item.bit_size <= 0) or (item.array_size and item.array_size <= 0):
            self.fixed_size = False

        # Recalculate the overall defined length of the structure
        update_needed = False
        if item.bit_offset >= 0:
            if item.bit_size > 0:
                if item.array_size is not None:
                    if item.array_size >= 0:
                        item_defined_length_bits = item.bit_offset + item.array_size
                    else:
                        item_defined_length_bits = item.bit_offset
                else:
                    item_defined_length_bits = item.bit_offset + item.bit_size

                if item_defined_length_bits > self.pos_bit_size:
                    self.pos_bit_size = item_defined_length_bits
                    update_needed = True

            elif item.bit_offset > self.pos_bit_size:
                self.pos_bit_size = item.bit_offset
                update_needed = True

        else:
            if abs(item.bit_offset) > self.neg_bit_size:
                self.neg_bit_size = abs(item.bit_offset)
                update_needed = True

        if update_needed:
            self.defined_length_bits = self.pos_bit_size + self.neg_bit_size
            self.defined_length = int(self.defined_length_bits / 8)
            if self.defined_length_bits % 8 != 0:
                self.defined_length += 1

        # Resize the buffer if necessary
        if self.buffer:
            self.resize_buffer()
        return item

    # Define an item at the end of the structure. This creates a new instance of the
    # item_class as given in the constructor and adds it to the items hash. It
    # also resizes the buffer to accommodate the new item.
    #
    # self.param name (see #define_item)
    # self.param bit_size (see #define_item)
    # self.param data_type (see #define_item)
    # self.param array_size (see #define_item)
    # self.param endianness (see #define_item)
    # self.param overflow (see #define_item)
    # self.return (see #define_item)
    def append_item(
        self,
        name,
        bit_size,
        data_type,
        array_size=None,
        endianness=None,
        overflow="ERROR",
    ):
        if not endianness:
            endianness = self.default_endianness
        if data_type == "DERIVED":
            return self.define_item(name, 0, bit_size, data_type, array_size, endianness, overflow)
        else:
            return self.define_item(
                name,
                self.defined_length_bits,
                bit_size,
                data_type,
                array_size,
                endianness,
                overflow,
            )

    # Adds an item at the  of the structure. It adds the item to the items
    # hash and resizes the buffer to accommodate the new item.
    #
    # self.param item (see #define)
    # self.return (see #define)
    def append(self, item):
        if item.data_type == "DERIVED":
            item.bit_offset = 0
        else:
            # We're appending a new item so set the bit_offset
            item.bit_offset = self.defined_length_bits
            # Also set original_bit_offset because it's currently 0
            # due to PacketItemParser::create_packet_item
            # get_bit_offset() returning 0 if append
            item.original_bit_offset = self.defined_length_bits

        return self.define(item)

    # self.param name [String] Name of the item to look up in the items Hash
    # self.return [StructureItem] StructureItem or one of its subclasses
    def get_item(self, name):
        item = self.items.get(name.upper())
        if not item:
            raise AttributeError(f"Unknown item: {name}")
        return item

    # self.param item [#name] Instance of StructureItem or one of its subclasses.
    #   The name method will be used to look up the item and set it to the new instance.
    def set_item(self, item):
        if self.items.get(item.name):
            self.items[item.name] = item

            # Need to allocate space for the variable length item if its minimum size is greater than zero
            if item.variable_bit_size:
                minimum_data_bits = 0
                if (item.data_type == "INT" or item.data_type == "UINT") and not item.original_array_size:
                    # Minimum QUIC encoded integer, see https://datatracker.ietf.org/doc/html/rfc9000#name-variable-length-integer-enc
                    minimum_data_bits = 6
                # STRING, BLOCK, or array item
                elif item.variable_bit_size["length_value_bit_offset"] > 0:
                    minimum_data_bits = (
                        item.variable_bit_size["length_value_bit_offset"]
                        * item.variable_bit_size["length_bits_per_count"]
                    )
                if minimum_data_bits > 0 and item.bit_offset >= 0 and self.defined_length_bits == item.bit_offset:
                    self.defined_length_bits += minimum_data_bits
        else:
            raise AttributeError(f"Unknown item: {item.name} - Ensure item name is uppercase")

    # self.param name [String] Name of the item to delete in the items Hash
    def delete_item(self, name):
        item = self.items[name.upper()]
        if not item:
            raise AttributeError(f"Unknown item: {name}")

        # Find the item to delete in the sorted_items array
        item_index = None
        for index, sorted_item in enumerate(self.sorted_items):
            if sorted_item.name == item.name:
                item_index = index
                break

        self.sorted_items.pop(item_index)
        self.items.pop(name.upper())

    # Write a value to the buffer based on the item definition
    #
    # self.param item [StructureItem] Instance of StructureItem or one of its subclasses
    # self.param value [Object] Value based on the item definition. This could be
    #   a string, integer, float, or array of values.
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to write the value to
    def write_item(self, item, value, value_type="RAW", buffer=None):
        if not buffer:
            buffer = self._buffer
        if not buffer:
            buffer = self.allocate_buffer_if_needed()
        self.accessor.write_item(item, value, buffer)

    # Write values to the buffer based on the item definitions
    #
    # self.param items [StructureItem] Array of StructureItem or one of its subclasses
    # self.param value [Object] Array of values based on the item definitions.
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to write the values to
    def write_items(self, items, values, value_type="RAW", buffer=None):
        if not buffer:
            buffer = self._buffer
        if not buffer:
            buffer = self.allocate_buffer_if_needed()
        self.accessor.write_items(items, values, buffer)

    # Read an item in the structure by name
    #
    # self.param name [String] Name of an item to read
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to read the item from
    # self.return Value based on the item definition. This could be an integer,
    #   float, or array of values.
    def read(self, name, value_type="RAW", buffer=None):
        if not buffer:
            buffer = self._buffer
        return self.read_item(self.get_item(name), value_type, buffer)

    # Write an item in the structure by name
    #
    # self.param name [Object] Name of the item to write
    # self.param value [Object] Value based on the item definition. This could be
    #   a string, integer, float, or array of values.
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to write the value to
    def write(self, name, value, value_type="RAW", buffer=None):
        if not buffer:
            buffer = self._buffer
        self.write_item(self.get_item(name), value, value_type, buffer)

    # Read all items in the structure into an array of arrays
    #   [[item name, item value], ...]
    #
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param buffer [String] The binary buffer to write the value to
    # self.param top [Boolean] Indicates if this is a top level call for the mutex
    # self.return [Array<Array>] Array of two element arrays containing the item
    #   name as element 0 and item value as element 1.
    def read_all(self, value_type="RAW", buffer=None, top=True):
        if not buffer:
            buffer = self._buffer
        item_array = []
        with self.synchronize_allow_reads(top):
            for item in self.sorted_items:
                item_array.append([item.name, self.read_item(item, value_type, buffer)])
        return item_array

    # Create a string that shows the name and value of each item in the structure
    #
    # self.param value_type [Symbol] Not used. Subclasses should overload this
    #   parameter to check whether to perform conversions on the item.
    # self.param indent [Integer] Amount to indent before printing the item name
    # self.param buffer [String] The binary buffer to write the value to
    # self.param ignored [Array<String>] List of items to ignore when building the string
    # self.return [String] String formatted with all the item names and values
    def formatted(self, value_type="RAW", indent=0, buffer=None, ignored=None):
        if not buffer:
            buffer = self._buffer
        indent_string = " " * indent
        string = ""
        with self.synchronize_allow_reads(True):
            for item in self.sorted_items:
                if ignored and item.name in ignored:
                    continue

                if (item.data_type != "BLOCK") or (
                    item.data_type == "BLOCK" and value_type != "RAW" and hasattr(item, "read_conversion")
                ):
                    string += f"{indent_string}{item.name}: {self.read_item(item, value_type, buffer)}\n"
                else:
                    value = self.read_item(item, value_type, buffer)
                    if isinstance(value, (str, bytes, bytearray)):
                        string += f"{indent_string}{item.name}:\n"
                        string += formatted(value, 1, 16, " ", indent + 2)
                    else:
                        string += f"{indent_string}{item.name}: {value}\n"

        return string

    # Get the buffer used by the structure. The current buffer is copied and
    # thus modifications to the returned buffer will have no effect on the
    # structure items.
    #
    # self.param copy [TrueClass/FalseClass] Whether to copy the buffer
    # self.return [String] Data buffer backing the structure
    @property
    def buffer(self):
        return self.allocate_buffer_if_needed()[:]

    def buffer_no_copy(self):
        return self.allocate_buffer_if_needed()

    # Set the buffer to be used by the structure. The buffer is copied and thus
    # further modifications to the buffer have no effect on the structure
    # items.
    #
    # self.param buffer [String] Buffer of data to back the structure items
    @buffer.setter
    def buffer(self, buffer):
        with self.synchronize():
            self.internal_buffer_equals(buffer)

    # Make a light weight clone of this structure. This only creates a new buffer
    # of data. The defined structure items are the same.
    #
    # self.return [Structure] A copy of the current structure with a new underlying
    #   buffer of data
    def clone(self):
        struct = copy.copy(self)
        struct._buffer = self.buffer  # Makes a copy
        struct.accessor.packet = struct
        return struct

    MUTEX = threading.Lock()

    def setup_mutex(self):
        if self.mutex:
            return
        with Structure.MUTEX:
            self.mutex_allow_reads = False
            self.mutex = threading.Lock()

    # Take the structure mutex to ensure the buffer does not change while you perform activities
    def synchronize(self):
        self.setup_mutex()
        return self.mutex

    # Take the structure mutex to ensure the buffer does not change while you perform activities
    # This versions allows reads to happen if a top level function has already taken the mutex
    # self.param top [Boolean] If True this will take the mutex and set an allow reads flag to allow
    #      lower level calls to go forward without getting the mutex
    @contextmanager
    def synchronize_allow_reads(self, top=False):
        self.setup_mutex()
        if top:
            with self.mutex:
                self.mutex_allow_reads = threading.get_ident()
                try:
                    yield
                finally:
                    self.mutex_allow_reads = False
        else:
            got_mutex = self.mutex.acquire(False)
            if got_mutex:
                try:
                    yield
                finally:
                    self.mutex.release()
            else:  # if self.mutex_allow_reads == threading.get_ident()
                yield

    def calculate_total_bit_size(self, item):
        if item.variable_bit_size:
            # Bit size is determined by length field
            length_value = self.read(item.variable_bit_size["length_item_name"], "CONVERTED")
            if item.data_type == "INT" or item.data_type == "UINT" and not item.original_array_size:
                match length_value:
                    case 0:
                        return 6
                    case 1:
                        return 14
                    case 2:
                        return 30
                    case _:
                        return 62
            else:
                return (length_value * item.variable_bit_size["length_bits_per_count"]) + item.variable_bit_size[
                    "length_value_bit_offset"
                ]
        elif item.original_bit_size <= 0:
            # Bit size is full packet length - bits before item + negative bits saved at end
            return (len(self._buffer) * 8) - item.bit_offset + item.original_bit_size
        elif item.original_array_size and item.original_array_size <= 0:
            # Bit size is full packet length - bits before item + negative bits saved at end
            return (len(self._buffer) * 8) - item.bit_offset + item.original_array_size
        else:
            raise AttributeError("Unexpected use of calculate_total_bit_size for non-variable-sized item")

    def recalculate_bit_offsets(self):
        adjustment = 0
        for item in self.sorted_items:
            # Anything with a negative bit offset should be left alone
            if item.original_bit_offset >= 0:
                item.bit_offset = item.original_bit_offset + adjustment
                if item.data_type != "DERIVED" and (
                    item.variable_bit_size
                    or item.original_bit_size <= 0
                    or (item.original_array_size and item.original_array_size <= 0)
                ):
                    new_bit_size = self.calculate_total_bit_size(item)
                    if item.original_bit_size != new_bit_size:
                        adjustment += new_bit_size - item.original_bit_size

    def internal_buffer_equals(self, buffer):
        if not isinstance(buffer, (bytes, bytearray)):
            raise AttributeError(f"Buffer class is {buffer.__class__.__name__} but must be bytearray")

        self._buffer = bytearray(buffer[:])
        if not self.fixed_size:
            self.recalculate_bit_offsets()

        # self.buffer.force_encoding('ASCII-8BIT'.freeze)
        if self.accessor.enforce_length():
            if len(self._buffer) != self.defined_length:
                if len(self._buffer) < self.defined_length:
                    self.resize_buffer()
                    if not self.short_buffer_allowed:
                        raise AttributeError("Buffer length less than defined length")
                elif self.fixed_size and self.defined_length != 0:
                    raise AttributeError("Buffer length greater than defined length")