BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/interfaces/linc_interface.rb

Summary

Maintainability
D
1 day
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'cosmos/interfaces/tcpip_client_interface'
require 'uuidtools'

module Cosmos
  # Interface for connecting to Ball Aerospace LINC Labview targets
  class LincInterface < TcpipClientInterface
    # The maximum number of asynchronous commands we can wait for at a time.
    # We don't ever expect to get close to this but we need to limit it
    # to ensure the Array doesn't grow out of control.
    MAX_CONCURRENT_HANDSHAKES = 1000

    def initialize(
      hostname,
      port,
      handshake_enabled = true,
      response_timeout = 5.0,
      read_timeout = nil,
      write_timeout = 5.0,
      length_bitoffset = 0,
      length_bitsize = 16,
      length_value_offset = 4,
      fieldname_guid = 'HDR_GUID',
      endianness = 'BIG_ENDIAN',
      fieldname_cmd_length = 'HDR_LENGTH'
    )
      # Initialize Super Class
      super(hostname, port, port, write_timeout, read_timeout, 'LENGTH',
        length_bitoffset, length_bitsize, length_value_offset, 1, endianness, 0, nil, nil)

      # Configuration Settings
      @handshake_enabled = ConfigParser.handle_true_false(handshake_enabled)
      @handshake_enableds = nil
      @response_timeout = response_timeout.to_f
      @length_value_offset = Integer(length_value_offset)
      @fieldname_guid = ConfigParser.handle_nil(fieldname_guid)
      @fieldname_cmd_length = ConfigParser.handle_nil(fieldname_cmd_length)

      # Other instance variables
      @ignored_error_codes = {}
      @handshake_cmds = []
      @handshakes_mutex = Mutex.new

      # Call this once now because the first time is slow
      UUIDTools::UUID.random_create.raw
    end # def initialize

    def connect
      # Packet definitions need to be retrieved here because @target_names is not filled in until after initialize
      unless @handshake_enableds
        @handshake_enableds = {}
        @target_names.each do |target_name|
          @handshake_enableds[target_name] = @handshake_enabled
          @ignored_error_codes[target_name] = []
        end
      end
      @handshake_packets = []
      @error_packets = []
      @error_ignore_commands = nil
      @error_handle_commands = nil
      @handshake_enable_commands = nil
      @handshake_disable_commands = nil

      @target_names.each do |target_name|
        @handshake_packets << System.telemetry.packet(target_name, 'HANDSHAKE')
        @error_packets << System.telemetry.packet(target_name, 'ERROR')

        # Handle not defining the interface configuration commands (Targets may not want to support this functionality)
        begin
          command = System.commands.packet(target_name, 'COSMOS_ERROR_IGNORE')
          @error_ignore_commands ||= []
          @error_ignore_commands << command
        rescue
        end
        begin
          command = System.commands.packet(target_name, 'COSMOS_ERROR_HANDLE')
          @error_handle_commands ||= []
          @error_handle_commands << command
        rescue
        end
        begin
          command = System.commands.packet(target_name, 'COSMOS_HANDSHAKE_EN')
          @handshake_enable_commands ||= []
          @handshake_enable_commands << command
        rescue
        end
        begin
          command = System.commands.packet(target_name, 'COSMOS_HANDSHAKE_DS')
          @handshake_disable_commands ||= []
          @handshake_disable_commands << command
        rescue
        end
      end

      @handshakes_mutex.synchronize do
        @handshake_cmds = []
      end

      # Actually connect
      super()
    end

    def write(packet)
      return if linc_interface_command(packet)
      raise "Interface not connected" unless connected?()

      # Add a GUID to the GUID field if its defined
      # A GUID means it's an asychronous packet type.
      if @fieldname_guid
        guid = get_guid(packet)
      else
        # If @fieldname_guid is not defined (syncronous) we don't care what the
        # GUID is because we're not trying to match it up with anything.
        # As soon as we get a response we free the command.
        guid = 0
      end

      # Fix the length field to handle the cases where a variable length packet
      # is defined. COSMOS does not do this automatically.
      update_length_field(packet) if @fieldname_cmd_length

      # Always take the mutex (even if we aren't handshaking)
      # We do not want any incoming telemetry to be missed because
      # it could be the handshake to this command.
      @handshakes_mutex.synchronize do
        super(packet) # Send the command
        wait_for_response(packet, guid) if @handshake_enableds[packet.target_name]
      end
    end

    def linc_interface_command(packet)
      if @error_ignore_commands
        @error_ignore_commands.each do |error_ignore_command|
          if error_ignore_command.identify?(packet.buffer(false))
            linc_cmd = error_ignore_command.clone
            linc_cmd.buffer = packet.buffer
            code = linc_cmd.read('CODE')
            @ignored_error_codes[error_ignore_command.target_name] << code unless @ignored_error_codes[error_ignore_command.target_name].include? code
            return true
          end
        end
      end

      if @error_handle_commands
        @error_handle_commands.each do |error_handle_command|
          if error_handle_command.identify?(packet.buffer(false))
            linc_cmd = error_handle_command.clone
            linc_cmd.buffer = packet.buffer
            code = linc_cmd.read('CODE')
            @ignored_error_codes[error_handle_command.target_name].delete(code) if @ignored_error_codes[error_handle_command.target_name].include? code
            return true
          end
        end
      end

      if @handshake_enable_commands
        @handshake_enable_commands.each do |handshake_enable_command|
          if handshake_enable_command.identify?(packet.buffer(false))
            @handshake_enabled = true
            @handshake_enableds[handshake_enable_command.target_name] = true
            return true
          end
        end
      end

      if @handshake_disable_commands
        @handshake_disable_commands.each do |handshake_disable_command|
          if handshake_disable_command.identify?(packet.buffer(false))
            @handshake_enabled = false
            @handshake_enableds[handshake_disable_command.target_name] = false
            return true
          end
        end
      end

      return false
    end

    def get_guid(packet)
      if not packet.read(@fieldname_guid) =~ /[\x01-\xFF]/
        # The GUID has not been set already (it has all \x00 values), so make a new one.
        # This enables a router GUI to make the GUIDs so it can process handshakes too.
        guid = UUIDTools::UUID.random_create.raw
        packet.write(@fieldname_guid, guid, :RAW)
      else
        guid = packet.read(@fieldname_guid)
      end
      return guid
    end

    def update_length_field(packet)
      length = packet.length - @length_value_offset
      packet.write(@fieldname_cmd_length, length, :RAW)
    end

    def wait_for_response(packet, guid)
      # Check the number of commands waiting for handshakes.  This is just for sanity
      # If the number of commands waiting for handshakes is very large then it can't be real
      # So raise an error.  Something has gone horribly wrong.
      if @handshake_cmds.length > MAX_CONCURRENT_HANDSHAKES
        len = @handshake_cmds.length
        @handshake_cmds = []
        raise "The number of commands waiting for handshakes to #{len}. Clearing all commands!"
      end

      # Create a handshake command object and add it to the list of commands waiting
      handshake_cmd = LincHandshakeCommand.new(@handshakes_mutex, guid)
      @handshake_cmds.push(handshake_cmd)

      # wait for that handshake. This releases the mutex so that the telemetry and other commands can start running again.
      timed_out = handshake_cmd.wait_for_handshake(@response_timeout)
      # We now have the mutex again.  This interface is blocked for the rest of the command handling,
      # which should be quick because it's just checking variables and logging.
      # We want to hold the mutex during that so that the commands get logged in order of handshake from here.

      if timed_out
        # Clean this command out of the array of items that require handshakes.
        @handshake_cmds.delete_if { |hsc| hsc == handshake_cmd }
        raise "Timeout waiting for handshake from #{System.commands.format(packet, System.targets[packet.target_name].ignored_parameters)}"
      end

      process_handshake_results(handshake_cmd)
    rescue Exception => err
      # If anything goes wrong after successfully writing the packet to the LINC target
      # ensure that the packet gets updated in the CVT and logged to the packet log writer.
      # COSMOS normally only does this if write returns successfully
      if packet.identified?
        command = System.commands.packet(packet.target_name, packet.packet_name)
      else
        command = System.commands.packet('UNKNOWN', 'UNKNOWN')
      end
      command.buffer = packet.buffer

      @packet_log_writer_pairs.each do |packet_log_writer_pair|
        packet_log_writer_pair.cmd_log_writer.write(packet)
      end

      raise err
    end

    def process_handshake_results(handshake_cmd)
      status = handshake_cmd.handshake.handshake.read('STATUS')
      code = handshake_cmd.handshake.handshake.read('CODE')
      source = handshake_cmd.handshake.error_source

      # Handle handshake warnings and errors
      if status == "OK" and code != 0
        unless @ignored_error_codes[handshake_cmd.handshake.handshake.target_name].include? code
          Logger.warn "#{@name}: Warning sending command (#{code}): #{source}"
        end
      elsif status == "ERROR"
        unless @ignored_error_codes[handshake_cmd.handshake.handshake.target_name].include? code
          raise "Error sending command (#{code}): #{source}"
        end
      end
    end

    def read
      packet = super()
      if packet
        @handshake_packets.each do |handshake_packet|
          if handshake_packet.identify?(packet.buffer(false))
            handshake_packet = handshake_packet.clone
            handshake_packet.buffer = packet.buffer
            linc_handshake = LincHandshake.new(handshake_packet, handshake_packet.target_name)

            # Check for a local handshake
            if handshake_packet.read('origin') == "LCL"
              handle_local_handshake(linc_handshake)
            else
              handle_remote_handshake(linc_handshake) if @handshake_enableds[handshake_packet.target_name]
            end # if handshake_packet.read('origin') == "LCL"
          end # @handshake_packet.identify?(packet.buffer(false))
        end
      end # if packet

      return packet
    end

    def handle_local_handshake(linc_handshake)
      # Update the current value table for this command
      command = System.commands.packet(linc_handshake.identified_command.target_name, linc_handshake.identified_command.packet_name)
      command.received_time = linc_handshake.identified_command.received_time
      command.buffer = linc_handshake.identified_command.buffer
      command.received_count += 1

      # Put a log of the command onto the server for the user to see
      Logger.info("#{@name}: External Command: " + System.commands.format(linc_handshake.identified_command, System.targets[linc_handshake.identified_command.target_name].ignored_parameters))

      # Log the command to the command log(s)
      @packet_log_writer_pairs.each do |packet_log_writer_pair|
        packet_log_writer_pair.cmd_log_writer.write(linc_handshake.identified_command)
      end
    end

    def handle_remote_handshake(linc_handshake)
      # This is a remote packet (sent from here).
      # Add to the array of handshake packet responses
      # The mutex is required by the command task due to the way it
      # first looks up the handshake before removing it.
      @handshakes_mutex.synchronize do
        if @fieldname_guid
          # A GUID means it's an asychronous packet type.
          # So look at the list of incoming handshakes and pick off (deleting)
          # the handshake from the list if it's for this command.
          #
          # The mutex is required because the telemetry task
          # could enqueue a response between the index lookup and the slice
          # function which would remove the wrong response. FAIL!

          # Loop through all waiting commands to see if this handshake belongs to them
          this_handshake_guid = linc_handshake.get_cmd_guid(@fieldname_guid)
          handshake_cmd_index = @handshake_cmds.index { |hsc| hsc.get_cmd_guid == this_handshake_guid }

          # If command was waiting (ie the loop above found one), then remove it from waiters and signal it
          if handshake_cmd_index
            handshake_cmd = @handshake_cmds.slice!(handshake_cmd_index)
            handshake_cmd.got_your_handshake(linc_handshake)
          else
            # No command match found!  Either it gave up and timed out or this wasn't originated from here.
            # Ignore this typically.  This case here for clarity.
          end

        else
          # Synchronous version: just pop the array (pull the command off) and send it the handshake
          handshake_cmd = @handshakes_cmds.pop
          handshake_cmd.got_your_handshake(linc_handshake)
        end # of handshaking type check
      end
    end
  end # class LincInterface

  # The LincHandshakeCommand class is used only by the LincInterface.
  # It is the command with other required items that is passed to the telemetry
  # thread so it can match it with the handshake.
  class LincHandshakeCommand
    attr_accessor :handshake

    def initialize(handshakes_mutex, cmd_guid)
      @cmd_guid = cmd_guid
      @handshakes_mutex = handshakes_mutex
      @resource = ConditionVariable.new
      @handshake = nil
    end

    def wait_for_handshake(response_timeout)
      timed_out = false

      @resource.wait(@handshakes_mutex, response_timeout)
      if @handshake
        timed_out = false
      else
        Logger.warn "#{@name}: No handshake - must be timeout."
        timed_out = true
      end

      return timed_out
    end

    def got_your_handshake(handshake)
      @handshake = handshake
      @resource.signal
    end

    def get_cmd_guid
      return @cmd_guid
    end
  end # class LincHandshakeCommand

  # The LincHandshake class is used only by the LincInterface. It processes the handshake and
  # helps with finding the information regarding the internal command.
  class LincHandshake
    attr_accessor :handshake
    attr_accessor :identified_command
    attr_accessor :error_source

    def initialize(handshake, interface_target_name)
      @handshake = handshake

      # Interpret the command field of the handshake packet
      # Where DATA is defined as:
      # 1 byte target name length
      # X byte target name
      # 1 byte packet name length
      # X byte packet name
      # 4 byte packet data length
      # X byte packet data
      # 4 byte error source length
      # X byte error source
      data = handshake.read('DATA')
      raise "Data field too short for target name length" if data.length == 0

      # Get target name length
      target_name_length = data[0..0].unpack('C')[0]
      raise "Invalid target name length" if target_name_length == 0

      data = data[1..-1]

      # get target name
      raise "Data field too short for target name" if data.length < target_name_length

      # target_name = data[0..(target_name_length - 1)] # Unused
      data = data[target_name_length..-1]

      # get packet name length
      raise "Data field too short for packet name length" if data.length == 0

      packet_name_length = data[0..0].unpack('C')[0]
      raise "Invalid packet name length" if packet_name_length == 0

      data = data[1..-1]

      # get packet name
      raise "Data field too short for packet name" if data.length < packet_name_length

      packet_name = data[0..(packet_name_length - 1)]
      data = data[packet_name_length..-1]

      # get packet data length
      raise "Data field too short for packet data length" if data.length < 4

      packet_data_length = data[0..3].unpack('N')[0]
      raise "Invalid data length" if packet_data_length == 0

      data = data[4..-1]

      # get packet data
      raise "Data field too short for packet data" if data.length < packet_data_length

      packet_data = data[0..(packet_data_length - 1)]
      data = data[packet_data_length..-1]

      # get error source length
      raise "Data field too short for error source length" if data.length < 4

      error_source_length = data[0..3].unpack('N')[0]
      # note it is OK to have a 0 source length
      data = data[4..-1]

      # get error source - store on object
      if error_source_length > 0
        @error_source = data[0..(error_source_length - 1)]
      else
        @error_source = ''
      end

      # make packet - store on object as a defined packet of type command that this handshakes
      @identified_command = System.commands.packet(interface_target_name, packet_name).clone
      @identified_command.buffer = packet_data
      @identified_command.received_time = Time.at(handshake.read('TIME_SECONDS'), handshake.read('TIME_MICROSECONDS')).sys
    end

    def get_cmd_guid(fieldname_guid)
      return @identified_command.read(fieldname_guid)
    end
  end
end