BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/packets/commands.rb

Summary

Maintainability
D
1 day
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'cosmos/packets/packet_config'

module Cosmos
  # Commands uses PacketConfig to parse the command and telemetry
  # configuration files. It contains all the knowledge of which command packets
  # exist in the system and how to access them. This class is the API layer
  # which other classes use to access commands.
  #
  # This should not be confused with the Api module which implements the JSON
  # API that is used by tools when accessing the Server. The Api module always
  # provides Ruby primatives where the PacketConfig class can return actual
  # Packet or PacketItem objects. While there are some overlapping methods between
  # the two, these are separate interfaces into the system.
  class Commands
    attr_accessor :config

    LATEST_PACKET_NAME = 'LATEST'.freeze

    # @param config [PacketConfig] Packet configuration to use to access the
    #  commands
    def initialize(config)
      @config = config
    end

    # (see PacketConfig#warnings)
    def warnings
      return @config.warnings
    end

    # @return [Array<String>] The command target names (excluding UNKNOWN)
    def target_names
      result = @config.commands.keys.sort
      result.delete('UNKNOWN'.freeze)
      return result
    end

    # @param target_name [String] The target name
    # @return [Hash<packet_name=>Packet>] Hash of the command packets for the given
    #   target name keyed by the packet name
    def packets(target_name)
      target_packets = @config.commands[target_name.to_s.upcase]
      raise "Command target '#{target_name.to_s.upcase}' does not exist" unless target_packets

      target_packets
    end

    # @param target_name [String] The target name
    # @param packet_name [String] The packet name. Must be a defined packet name
    #   and not 'LATEST'.
    # @return [Packet] The command packet for the given target and packet name
    def packet(target_name, packet_name)
      target_packets = packets(target_name)
      packet = target_packets[packet_name.to_s.upcase]
      raise "Command packet '#{target_name.to_s.upcase} #{packet_name.to_s.upcase}' does not exist" unless packet

      packet
    end

    # @param target_name (see #packet)
    # @param packet_name (see #packet)
    # @return [Array<PacketItem>] The command parameters for the given target and packet name
    def params(target_name, packet_name)
      return packet(target_name, packet_name).sorted_items
    end

    # Identifies an unknown buffer of data as a defined command and sets the
    # commands's data to the given buffer. Identifying a command uses the fields
    # marked as ID_PARAMETER to identify if the buffer passed represents the
    # command defined. Incorrectly sized buffers are still processed but an
    # error is logged.
    #
    # Note: Subsequent requests for the command (using packet) will return
    # an uninitialized copy of the command. Thus you must use the return value
    # of this method.
    #
    # @param (see #identify_tlm!)
    # @return (see #identify_tlm!)
    def identify(packet_data, target_names = nil)
      identified_packet = nil

      target_names = target_names() unless target_names

      target_names.each do |target_name|
        target_name = target_name.to_s.upcase
        target_packets = nil
        begin
          target_packets = packets(target_name)
        rescue RuntimeError
          # No commands for this target
          next
        end

        target = System.targets[target_name]
        if target and target.cmd_unique_id_mode
          # Iterate through the packets and see if any represent the buffer
          target_packets.each do |packet_name, packet|
            if packet.identify?(packet_data)
              identified_packet = packet
              break
            end
          end
        else
          # Do a hash lookup to quickly identify the packet
          if target_packets.length > 0
            packet = target_packets.first[1]
            key = packet.read_id_values(packet_data)
            hash = @config.cmd_id_value_hash[target_name]
            identified_packet = hash[key]
            identified_packet = hash['CATCHALL'.freeze] unless identified_packet
          end
        end

        if identified_packet
          identified_packet.received_count += 1
          identified_packet = identified_packet.clone
          identified_packet.received_time = nil
          identified_packet.stored = false
          identified_packet.extra = nil
          identified_packet.buffer = packet_data
          break
        end
      end

      return identified_packet
    end

    # Returns a copy of the specified command packet with the parameters
    # initialzed to the given params values.
    #
    # @param target_name (see #packet)
    # @param packet_name (see #packet)
    # @param params [Hash<param_name=>param_value>] Parameter items to override
    #   in the given command.
    # @param range_checking [Boolean] Whether to perform range checking on the
    #   passed in parameters.
    # @param raw [Boolean] Indicates whether or not to run conversions on command parameters
    # @param check_required_params [Boolean] Indicates whether or not to check
    #   that the required command parameters are present
    def build_cmd(target_name, packet_name, params = {}, range_checking = true, raw = false, check_required_params = true)
      target_upcase = target_name.to_s.upcase
      packet_upcase = packet_name.to_s.upcase

      # Lookup the command and create a light weight copy
      pkt = packet(target_upcase, packet_upcase)
      pkt.received_count += 1
      command = pkt.clone

      # Restore the command's buffer to a zeroed string of defined length
      # This will undo any side effects from earlier commands that may have altered the size
      # of the buffer
      command.buffer = "\x00" * command.defined_length

      # Set time, parameters, and restore defaults
      command.received_time = Time.now.sys
      command.stored = false
      command.extra = nil
      command.given_values = params
      command.restore_defaults(command.buffer(false), params.keys)
      command.raw = raw

      given_item_names = set_parameters(command, params, range_checking)
      check_required_params(command, given_item_names) if check_required_params

      return command
    end

    # Formatted version of a command
    def format(packet, ignored_parameters = [])
      if packet.raw
        items = packet.read_all(:RAW)
        raw = true
      else
        items = packet.read_all(:FORMATTED)
        raw = false
      end
      items.delete_if { |item_name, item_value| ignored_parameters.include?(item_name) }
      return build_cmd_output_string(packet.target_name, packet.packet_name, items, raw)
    end

    def build_cmd_output_string(target_name, cmd_name, cmd_params, raw = false)
      if raw
        output_string = 'cmd_raw("'
      else
        output_string = 'cmd("'
      end
      output_string << target_name + ' ' + cmd_name
      if cmd_params.nil? or cmd_params.empty?
        output_string << '")'
      else
        begin
          command_items = packet(target_name, cmd_name).items
        rescue
        end

        params = []
        cmd_params.each do |key, value|
          next if Packet::RESERVED_ITEM_NAMES.include?(key)

          begin
            item_type = command_items[key].data_type
          rescue
            item_type = nil
          end

          if value.is_a?(String)
            value = value.dup
            if item_type == :BLOCK or item_type == :STRING
              if !value.is_printable?
                value = "0x" + value.simple_formatted
              else
                value = value.inspect
              end
            else
              value = value.convert_to_value.to_s
            end
            if value.length > 256
              value = value[0..255] + "...'"
            end
            value.tr!('"', "'")
          elsif value.is_a?(Array)
            value = "[#{value.join(", ")}]"
          end
          params << "#{key} #{value}"
        end
        params = params.join(", ")
        output_string << ' with ' + params + '")'
      end
      return output_string
    end

    # Returns whether the given command is hazardous. Commands are hazardous
    # if they are marked hazardous overall or if any of their hardardous states
    # are set. Thus any given parameter values are first applied to the command
    # and then checked for hazardous states.
    #
    # @param command [Packet] The command to check for hazardous
    def cmd_pkt_hazardous?(command)
      return [true, command.hazardous_description] if command.hazardous

      # Check each item for hazardous states
      item_defs = command.items
      item_defs.each do |item_name, item_def|
        if item_def.hazardous
          state_name = command.read(item_name)
          # Nominally the command.read will return a valid state_name
          # If it doesn't, the if check will fail and we'll fall through to
          # the bottom where we return [false, nil] which means this
          # command is not hazardous.
          return [true, item_def.hazardous[state_name]] if item_def.hazardous[state_name]
        end
      end

      return [false, nil]
    end

    # Returns whether the given command is hazardous. Commands are hazardous
    # if they are marked hazardous overall or if any of their hardardous states
    # are set. Thus any given parameter values are first applied to the command
    # and then checked for hazardous states.
    #
    # @param target_name (see #packet)
    # @param packet_name (see #packet)
    # @param params (see #build_cmd)
    def cmd_hazardous?(target_name, packet_name, params = {})
      # Build a command without range checking, perform conversions, and don't
      # check required parameters since we're not actually using the command.
      cmd_pkt_hazardous?(build_cmd(target_name, packet_name, params, false, false, false))
    end

    def clear_counters
      @config.commands.each do |target_name, target_packets|
        target_packets.each do |packet_name, packet|
          packet.received_count = 0
        end
      end
    end

    # Returns an array with a "TARGET_NAME PACKET_NAME" string for every command in the system (PACKET_NAME == command name)
    def all_packet_strings(include_hidden = false, splash = nil)
      strings = []
      tnames = target_names()
      total = tnames.length.to_f
      tnames.each_with_index do |target_name, index|
        if splash
          splash.message = "Processing #{target_name} command"
          splash.progress = index / total
        end

        # TODO: This wasn't being used ... should it be
        # ignored_items = System.targets[target_name].ignored_items

        packets(target_name).each do |command_name, packet|
          # We don't audit against hidden or disabled packets/commands
          next if !include_hidden and (packet.hidden || packet.disabled)

          strings << "#{target_name} #{command_name}"
        end
      end
      strings
    end

    def all
      @config.commands
    end

    protected

    def set_parameters(command, params, range_checking)
      given_item_names = []
      params.each do |item_name, value|
        item_upcase = item_name.to_s.upcase
        item = command.get_item(item_upcase)
        range_check_value = value

        # Convert from state to value if possible
        if item.states and item.states[value.to_s.upcase]
          range_check_value = item.states[value.to_s.upcase]
        end

        if range_checking
          range = item.range
          if range
            # Perform Range Check on command parameter
            if not range.include?(range_check_value)
              range_check_value = "'#{range_check_value}'" if String === range_check_value
              raise "Command parameter '#{command.target_name} #{command.packet_name} #{item_upcase}' = #{range_check_value} not in valid range of #{range.first} to #{range.last}"
            end
          end
        end

        # Update parameter in command
        if command.raw
          command.write(item_upcase, value, :RAW)
        else
          command.write(item_upcase, value, :CONVERTED)
        end

        given_item_names << item_upcase
      end
      given_item_names
    end

    def check_required_params(command, given_item_names)
      # Script Runner could call this command with only some parameters
      # so make sure any required parameters were actually passed in.
      item_defs = command.items
      item_defs.each do |item_name, item_def|
        if item_def.required and not given_item_names.include? item_name
          raise "Required command parameter '#{command.target_name} #{command.packet_name} #{item_name}' not given"
        end
      end
    end
  end # class Commands
end # module Cosmos