BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/microservices/interface_microservice.rb

Summary

Maintainability
F
3 days
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'cosmos/microservices/microservice'
require 'cosmos/models/interface_model'
require 'cosmos/models/router_model'
require 'cosmos/models/interface_status_model'
require 'cosmos/models/router_status_model'
require 'cosmos/topics/telemetry_topic'
require 'cosmos/topics/command_topic'
require 'cosmos/topics/command_decom_topic'
require 'cosmos/topics/interface_topic'
require 'cosmos/topics/router_topic'

module Cosmos
  class InterfaceCmdHandlerThread
    def initialize(interface, tlm, scope:)
      @interface = interface
      @tlm = tlm
      @scope = scope
    end

    def start
      @thread = Thread.new do
        run()
      rescue Exception => err
        Logger.error "#{@interface.name}: Command handler thread died: #{err.formatted}"
        raise err
      end
    end

    def stop
      Cosmos.kill_thread(self, @thread)
    end

    def graceful_kill
      InterfaceTopic.shutdown(@interface, scope: @scope)
    end

    def run
      InterfaceTopic.receive_commands(@interface, scope: @scope) do |topic, msg_hash|
        # Check for a raw write to the interface
        if topic =~ /CMD}INTERFACE/
          if msg_hash['shutdown']
            Logger.info "#{@interface.name}: Shutdown requested"
            return
          end
          if msg_hash['connect']
            Logger.info "#{@interface.name}: Connect requested"
            @tlm.attempting()
            next 'SUCCESS'
          end
          if msg_hash['disconnect']
            Logger.info "#{@interface.name}: Disconnect requested"
            @tlm.disconnect(false)
            next 'SUCCESS'
          end
          if msg_hash['raw']
            Logger.info "#{@interface.name}: Write raw"
            # A raw interface write results in an UNKNOWN packet
            command = System.commands.packet('UNKNOWN', 'UNKNOWN')
            command.received_count += 1
            command = command.clone
            command.buffer = msg_hash['raw']
            command.received_time = Time.now
            CommandTopic.write_packet(command, scope: @scope)
            @interface.write_raw(msg_hash['raw'])
            next 'SUCCESS'
          end
          if msg_hash.key?('log_raw')
            if msg_hash['log_raw'] == 'true'
              Logger.info "#{@interface.name}: Enable raw logging"
              @interface.start_raw_logging
            else
              Logger.info "#{@interface.name}: Disable raw logging"
              @interface.stop_raw_logging
            end
            next 'SUCCESS'
          end
        end

        target_name = msg_hash['target_name']
        cmd_name = msg_hash['cmd_name']
        cmd_params = nil
        cmd_buffer = nil
        hazardous_check = nil
        if msg_hash['cmd_params']
          cmd_params = JSON.parse(msg_hash['cmd_params'])
          range_check = ConfigParser.handle_true_false(msg_hash['range_check'])
          raw = ConfigParser.handle_true_false(msg_hash['raw'])
          hazardous_check = ConfigParser.handle_true_false(msg_hash['hazardous_check'])
        elsif msg_hash['cmd_buffer']
          cmd_buffer = msg_hash['cmd_buffer']
        end

        begin
          begin
            if cmd_params
              command = System.commands.build_cmd(target_name, cmd_name, cmd_params, range_check, raw)
            elsif cmd_buffer
              if target_name
                command = System.commands.identify(cmd_buffer, [target_name])
              else
                command = System.commands.identify(cmd_buffer, @target_names)
              end
              unless command
                command = System.commands.packet('UNKNOWN', 'UNKNOWN')
                command.received_count += 1
                command = command.clone
                command.buffer = cmd_buffer
              end
            else
              raise "Invalid command received:\n #{msg_hash}"
            end
            command.received_time = Time.now
          rescue => e
            Logger.error "#{@interface.name}: #{e.formatted}"
            next e.message
          end

          if hazardous_check
            hazardous, hazardous_description = System.commands.cmd_pkt_hazardous?(command)
            # Return back the error, description, and the formatted command
            # This allows the error handler to simply re-send the command
            next "HazardousError\n#{hazardous_description}\n#{System.commands.format(command)}" if hazardous
          end

          begin
            @interface.write(command)
            CommandTopic.write_packet(command, scope: @scope)
            CommandDecomTopic.write_packet(command, scope: @scope)
            InterfaceStatusModel.set(@interface.as_json, scope: @scope)
            next 'SUCCESS'
          rescue => e
            Logger.error "#{@interface.name}: #{e.formatted}"
            next e.message
          end
        rescue => e
          Logger.error "#{@interface.name}: #{e.formatted}"
          next e.message
        end
      end
    end
  end

  class RouterTlmHandlerThread
    def initialize(router, tlm, scope:)
      @router = router
      @tlm = tlm
      @scope = scope
    end

    def start
      @thread = Thread.new do
        run()
      rescue Exception => err
        Logger.error "#{@router.name}: Telemetry handler thread died: #{err.formatted}"
        raise err
      end
    end

    def stop
      Cosmos.kill_thread(self, @thread)
    end

    def graceful_kill
      RouterTopic.shutdown(@router, scope: @scope)
    end

    def run
      RouterTopic.receive_telemetry(@router, scope: @scope) do |topic, msg_hash|
        # Check for commands to the router itself
        if /CMD}ROUTER/.match?(topic)
          if msg_hash['shutdown']
            Logger.info "#{@router.name}: Shutdown requested"
            return
          end
          if msg_hash['connect']
            Logger.info "#{@router.name}: Connect requested"
            @tlm.attempting()
          end
          if msg_hash['disconnect']
            Logger.info "#{@router.name}: Disconnect requested"
            @tlm.disconnect(false)
          end
          if msg_hash.key?('log_raw')
            if msg_hash['log_raw'] == 'true'
              Logger.info "#{@router.name}: Enable raw logging"
              @router.start_raw_logging
            else
              Logger.info "#{@router.name}: Disable raw logging"
              @router.stop_raw_logging
            end
          end
          next 'SUCCESS'
        end

        if @router.connected?
          target_name = msg_hash["target_name"]
          packet_name = msg_hash["packet_name"]

          packet = System.telemetry.packet(target_name, packet_name)
          packet.stored = ConfigParser.handle_true_false(msg_hash["stored"])
          packet.received_time = Time.from_nsec_from_epoch(msg_hash["time"].to_i)
          packet.received_count = msg_hash["received_count"].to_i
          packet.buffer = msg_hash["buffer"]

          begin
            @router.write(packet)
            RouterStatusModel.set(@router.as_json, scope: @scope)
            next 'SUCCESS'
          rescue => e
            Logger.error "#{@router.name}: #{e.formatted}"
            next e.message
          end
        end
      end
    end
  end

  class InterfaceMicroservice < Microservice
    UNKNOWN_BYTES_TO_PRINT = 16

    def initialize(name)
      super(name)
      @interface_or_router = self.class.name.to_s.split("Microservice")[0].upcase.split("::")[-1]
      @scope = name.split("__")[0]
      interface_name = name.split("__")[2]
      if @interface_or_router == 'INTERFACE'
        @interface = InterfaceModel.get_model(name: interface_name, scope: @scope).build
      else
        @interface = RouterModel.get_model(name: interface_name, scope: @scope).build
      end
      @interface.name = interface_name
      # Map the interface to the interface's targets
      @interface.target_names do |target_name|
        target = System.targets[target_name]
        target.interface = @interface
      end
      if @interface.connect_on_startup
        @interface.state = 'ATTEMPTING'
      else
        @interface.state = 'DISCONNECTED'
      end
      if @interface_or_router == 'INTERFACE'
        InterfaceStatusModel.set(@interface.as_json, scope: @scope)
      else
        RouterStatusModel.set(@interface.as_json, scope: @scope)
      end

      @interface_thread_sleeper = Sleeper.new
      @cancel_thread = false
      @connection_failed_messages = []
      @connection_lost_messages = []
      @mutex = Mutex.new
      if @interface_or_router == 'INTERFACE'
        @handler_thread = InterfaceCmdHandlerThread.new(@interface, self, scope: @scope)
      else
        @handler_thread = RouterTlmHandlerThread.new(@interface, self, scope: @scope)
      end
      @handler_thread.start
    end

    # External method to be called by the InterfaceCmdHandlerThread to connect
    # Thus we just set the state and allow the run method to handle the action
    def attempting
      @interface.state = 'ATTEMPTING'
      if @interface_or_router == 'INTERFACE'
        InterfaceStatusModel.set(@interface.as_json, scope: @scope)
      else
        RouterStatusModel.set(@interface.as_json, scope: @scope)
      end
    end

    def run
      begin
        if @interface.read_allowed?
          Logger.info "#{@interface.name}: Starting packet reading"
        else
          Logger.info "#{@interface.name}: Starting connection maintenance"
        end
        while true
          break if @cancel_thread

          case @interface.state
          when 'DISCONNECTED'
            begin
              # Just wait to see if we should connect later
              @interface_thread_sleeper.sleep(1)
            rescue Exception => err
              break if @cancel_thread
            end
          when 'ATTEMPTING'
            begin
              @mutex.synchronize do
                # We need to make sure connect is not called after stop() has been called
                connect() unless @cancel_thread
              end
            rescue Exception => connect_error
              handle_connection_failed(connect_error)
              break if @cancel_thread
            end
          when 'CONNECTED'
            if @interface.read_allowed?
              begin
                packet = @interface.read
                if packet
                  handle_packet(packet)
                  @count += 1
                else
                  Logger.info "#{@interface.name}: Internal disconnect requested (returned nil)"
                  handle_connection_lost()
                  break if @cancel_thread
                end
              rescue Exception => err
                handle_connection_lost(err)
                break if @cancel_thread
              end
            else
              @interface_thread_sleeper.sleep(1)
              handle_connection_lost() if !@interface.connected?
            end
          end
        end
      rescue Exception => error
        Logger.error "#{@interface.name}: Packet reading thread died: #{error.formatted}"
        Cosmos.handle_fatal_exception(error)
        # Try to do clean disconnect because we're going down
        disconnect(false)
      end
      if @interface_or_router == 'INTERFACE'
        InterfaceStatusModel.set(@interface.as_json, scope: @scope)
      else
        RouterStatusModel.set(@interface.as_json, scope: @scope)
      end
      Logger.info "#{@interface.name}: Stopped packet reading"
    end

    def handle_packet(packet)
      InterfaceStatusModel.set(@interface.as_json, scope: @scope)
      packet.received_time = Time.now.sys unless packet.received_time

      if packet.stored
        # Stored telemetry does not update the current value table
        identified_packet = System.telemetry.identify_and_define_packet(packet, @target_names)
      else
        # Identify and update packet
        if packet.identified?
          begin
            # Preidentifed packet - place it into the current value table
            identified_packet = System.telemetry.update!(packet.target_name,
                                                         packet.packet_name,
                                                         packet.buffer)
          rescue RuntimeError
            # Packet identified but we don't know about it
            # Clear packet_name and target_name and try to identify
            Logger.warn "#{@interface.name}: Received unknown identified telemetry: #{packet.target_name} #{packet.packet_name}"
            packet.target_name = nil
            packet.packet_name = nil
            identified_packet = System.telemetry.identify!(packet.buffer,
                                                           @target_names)
          end
        else
          # Packet needs to be identified
          identified_packet = System.telemetry.identify!(packet.buffer,
                                                         @target_names)
        end
      end

      if identified_packet
        identified_packet.received_time = packet.received_time
        identified_packet.stored = packet.stored
        identified_packet.extra = packet.extra
        packet = identified_packet
      else
        unknown_packet = System.telemetry.update!('UNKNOWN', 'UNKNOWN', packet.buffer)
        unknown_packet.received_time = packet.received_time
        unknown_packet.stored = packet.stored
        unknown_packet.extra = packet.extra
        packet = unknown_packet
        json_hash = CvtModel.build_json_from_packet(packet)
        CvtModel.set(json_hash, target_name: packet.target_name, packet_name: packet.packet_name, scope: scope)
        num_bytes_to_print = [UNKNOWN_BYTES_TO_PRINT, packet.length].min
        data = packet.buffer(false)[0..(num_bytes_to_print - 1)]
        prefix = data.each_byte.map { | byte | sprintf("%02X", byte) }.join()
        Logger.warn "#{@interface.name} #{packet.target_name} packet length: #{packet.length} starting with: #{prefix}"
      end

      # Write to stream
      packet.received_count += 1
      TelemetryTopic.write_packet(packet, scope: @scope)
    end

    def handle_connection_failed(connect_error)
      @error = connect_error
      Logger.error "#{@interface.name}: Connection Failed: #{connect_error.formatted(false, false)}"
      case connect_error
      when Interrupt
        Logger.info "#{@interface.name}: Closing from signal"
        @cancel_thread = true
      when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ENOTSOCK, Errno::EHOSTUNREACH, IOError
        # Do not write an exception file for these extremely common cases
      else
        if RuntimeError === connect_error and (connect_error.message =~ /canceled/ or connect_error.message =~ /timeout/)
          # Do not write an exception file for these extremely common cases
        else
          Logger.error "#{@interface.name}: #{connect_error.formatted}"
          unless @connection_failed_messages.include?(connect_error.message)
            Cosmos.write_exception_file(connect_error)
            @connection_failed_messages << connect_error.message
          end
        end
      end
      disconnect() # Ensure we do a clean disconnect
    end

    def handle_connection_lost(err = nil, reconnect: true)
      if err
        @error = err
        Logger.info "#{@interface.name}: Connection Lost: #{err.formatted(false, false)}"
        case err
        when Interrupt
          Logger.info "#{@interface.name}: Closing from signal"
          @cancel_thread = true
        when Errno::ECONNABORTED, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::EBADF, Errno::ENOTSOCK, IOError
          # Do not write an exception file for these extremely common cases
        else
          Logger.error "#{@interface.name}: #{err.formatted}"
          unless @connection_lost_messages.include?(err.message)
            Cosmos.write_exception_file(err)
            @connection_lost_messages << err.message
          end
        end
      else
        Logger.info "#{@interface.name}: Connection Lost"
      end
      disconnect(reconnect) # Ensure we do a clean disconnect
    end

    def connect
      Logger.info "#{@interface.name}: Connecting ..."
      @interface.connect
      @interface.state = 'CONNECTED'
      if @interface_or_router == 'INTERFACE'
        InterfaceStatusModel.set(@interface.as_json, scope: @scope)
      else
        RouterStatusModel.set(@interface.as_json, scope: @scope)
      end
      Logger.info "#{@interface.name}: Connection Success"
    end

    def disconnect(allow_reconnect = true)
      return if @interface.state == 'DISCONNECTED' && !@interface.connected?

      # Synchronize the calls to @interface.disconnect since it takes an unknown
      # amount of time. If two calls to disconnect stack up, the if statement
      # should avoid multiple calls to disconnect.
      @mutex.synchronize do
        begin
          @interface.disconnect if @interface.connected?
        rescue => e
          Logger.error "Disconnect: #{@interface.name}: #{e.formatted}"
        end
      end

      # If the interface is set to auto_reconnect then delay so the thread
      # can come back around and allow the interface a chance to reconnect.
      if allow_reconnect and @interface.auto_reconnect and @interface.state != 'DISCONNECTED'
        attempting()
        if !@cancel_thread
          # Logger.debug "reconnect delay: #{@interface.reconnect_delay}"
          @interface_thread_sleeper.sleep(@interface.reconnect_delay)
        end
      else
        @interface.state = 'DISCONNECTED'
        if @interface_or_router == 'INTERFACE'
          InterfaceStatusModel.set(@interface.as_json, scope: @scope)
        else
          RouterStatusModel.set(@interface.as_json, scope: @scope)
        end
      end
    end

    # Disconnect from the interface and stop the thread
    def stop
      Logger.info "#{@interface.name}: stop requested"
      @mutex.synchronize do
        # Need to make sure that @cancel_thread is set and the interface disconnected within
        # mutex to ensure that connect() is not called when we want to stop()
        @cancel_thread = true
        @handler_thread.stop
        @interface_thread_sleeper.cancel
        @interface.disconnect
        if @interface_or_router == 'INTERFACE'
          valid_interface = InterfaceStatusModel.get_model(name: @interface.name, scope: @scope)
        else
          valid_interface = RouterStatusModel.get_model(name: @interface.name, scope: @scope)
        end
        valid_interface.destroy if valid_interface
      end
    end

    def shutdown(sig = nil)
      Logger.info "#{@interface.name}: shutdown requested"
      stop()
      super()
    end

    def graceful_kill
      # Just to avoid warning
    end
  end
end

Cosmos::InterfaceMicroservice.run if __FILE__ == $0