BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/interfaces/tcpip_server_interface.rb

Summary

Maintainability
D
2 days
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'socket'
require 'thread' # For Mutex
require 'timeout' # For Timeout::Error
require 'cosmos/interfaces/stream_interface'
require 'cosmos/streams/tcpip_socket_stream'
require 'cosmos/config/config_parser'

module Cosmos
  # TCP/IP Server which can both read and write on a single port or two
  # independent ports. A listen thread is setup which waits for client
  # connections. For each connection to the read port, a thread is spawned that
  # calls the read method from the interface. This data is then
  # available by calling the TcpipServer read method. For each connection to the
  # write port, a thread is spawned that calls the write method from the
  # interface when data is send to the TcpipServer via the write method.
  class TcpipServerInterface < StreamInterface
    # Data class which stores the interface and associated information
    class InterfaceInfo
      attr_reader :interface, :hostname, :host_ip, :port

      def initialize(interface, hostname, host_ip, port)
        @interface = interface
        @hostname = hostname
        @host_ip = host_ip
        @port = port
      end
    end

    # Callback method to call when a new client connects to the write port.
    # This method will be called with the Interface as the only argument.
    attr_accessor :write_connection_callback
    # Callback method to call when a new client connects to the read port.
    # This method will be called with the Interface as the only argument.
    attr_accessor :read_connection_callback
    # @return [RawLoggerPair] RawLoggerPair instance or nil
    attr_accessor :raw_logger_pair
    # @return [String] The ip address to bind to.  Default to ANY (0.0.0.0)
    attr_accessor :listen_address
    # @return [boolean] Automatically send SYSTEM META on connect - Default false - Can be CMD/TLM
    attr_accessor :auto_system_meta

    # @param write_port [Integer] The server write port. Clients should connect
    #   and expect to receive data from this port.
    # @param read_port [Integer] The server read port. Clients should connect
    #   and expect to send data to this port.
    # @param write_timeout [Float|nil] The number of seconds to wait for the
    #   write to complete. Pass nil to block until the write is complete.
    # @param read_timeout [Float|nil] The number of seconds to wait for the
    #   read to complete. Pass nil to block until the read is complete.
    # @param protocol_type [String] The name of the stream to
    #   use for both the read and write ports. This name is combined with
    #   'Protocol' to result in a COSMOS Protocol class.
    # @param protocol_args [Array] Arguments to pass to the Protocol
    def initialize(write_port,
                   read_port,
                   write_timeout,
                   read_timeout,
                   protocol_type = nil,
                   *protocol_args)
      super(protocol_type, protocol_args)
      @write_port = ConfigParser.handle_nil(write_port)
      @write_port = Integer(write_port) if @write_port
      @read_port = ConfigParser.handle_nil(read_port)
      @read_port = Integer(read_port) if @read_port
      @write_timeout = ConfigParser.handle_nil(write_timeout)
      @write_timeout = @write_timeout.to_f if @write_timeout
      @read_timeout = ConfigParser.handle_nil(read_timeout)
      @read_timeout = @read_timeout.to_f if @read_timeout
      @listen_sockets = []
      @listen_pipes = []
      @listen_threads = []
      @read_threads = []
      @write_thread = nil
      @write_raw_thread = nil
      @write_interface_infos = []
      @read_interface_infos = []
      @write_queue = nil
      @write_queue = Queue.new if @write_port
      @write_raw_queue = nil
      @write_raw_queue = Queue.new if @write_port
      @read_queue = nil
      @read_queue = Queue.new if @read_port
      @write_condition_variable = nil
      @write_condition_variable = ConditionVariable.new if @write_port
      @write_raw_mutex = nil
      @write_raw_mutex = Mutex.new if @write_port
      @write_raw_condition_variable = nil
      @write_raw_condition_variable = ConditionVariable.new if @write_port
      @write_connection_callback = nil
      @read_connection_callback = nil
      @raw_logger_pair = nil
      @raw_logging_enabled = false
      @connection_mutex = Mutex.new
      @listen_address = "0.0.0.0"
      @auto_system_meta = false

      @read_allowed = false unless ConfigParser.handle_nil(read_port)
      @write_allowed = false unless ConfigParser.handle_nil(write_port)
      @write_raw_allowed = false unless ConfigParser.handle_nil(write_port)

      @connected = false
    end

    # Create the read and write port listen threads. Incoming connections will
    # spawn separate threads to process the reads and writes.
    def connect
      @cancel_threads = false
      @read_queue.clear if @read_queue
      if @write_port == @read_port # One socket
        start_listen_thread(@read_port, true, true)
      else
        start_listen_thread(@write_port, true, false) if @write_port
        start_listen_thread(@read_port, false, true) if @read_port
      end

      if @write_port
        @write_thread = Thread.new do
          loop do
            write_thread_body()
            break if @cancel_threads
          end
        rescue Exception => err
          shutdown_interfaces(@write_interface_infos)
          Logger.error("#{@name}: Tcpip server write thread unexpectedly died")
          Logger.error(err.formatted)
        end
        @write_raw_thread = Thread.new do
          loop do
            write_raw_thread_body()
            break if @cancel_threads
          end
        rescue Exception => err
          shutdown_interfaces(@write_interface_infos)
          Logger.error("#{@name}: Tcpip server write raw thread unexpectedly died")
          Logger.error(err.formatted)
        end
      else
        @write_thread = nil
        @write_raw_thread = nil
      end
      @connected = true
    end

    # @return [Boolean] Whether the server is listening for connections
    def connected?
      @connected
    end

    # Shutdowns the listener threads for both the read and write ports as well
    # as any client connections.
    def disconnect
      @cancel_threads = true
      @read_queue << nil if @read_queue
      @listen_pipes.each do |pipe|
        pipe.write('.')
      rescue Exception
        # Oh well
      end
      @listen_pipes.clear

      # Shutdown listen thread(s)
      @listen_threads.each { |listen_thread| Cosmos.kill_thread(self, listen_thread) }
      @listen_threads.clear

      # Shutdown listen socket(s)
      @listen_sockets.each do |listen_socket|
        Cosmos.close_socket(listen_socket)
      rescue IOError
        # Ok may have been closed by the thread
      end
      @listen_sockets.clear

      # This will unblock read threads
      shutdown_interfaces(@read_interface_infos)

      @read_threads.each { |thread| Cosmos.kill_thread(self, thread) }
      @read_threads.clear
      if @write_thread
        Cosmos.kill_thread(self, @write_thread)
        @write_thread = nil
      end
      if @write_raw_thread
        Cosmos.kill_thread(self, @write_raw_thread)
        @write_raw_thread = nil
      end

      shutdown_interfaces(@write_interface_infos)
      @connected = false
    end

    # Gracefully kill all the threads
    def graceful_kill
      # This method is just here to prevent warnings
    end

    # @return [Packet] Latest packet read from any of the connected clients.
    #   Note this method blocks until data is available.
    def read
      raise "Interface not connected for read: #{@name}" unless connected?
      raise "Interface not readable: #{@name}" unless read_allowed?

      packet = @read_queue.pop
      return nil unless packet

      @read_count += 1
      packet
    end

    # @param packet [Packet] Packet to write to all clients connected to the
    #   write port.
    def write(packet)
      raise "Interface not connected for write: #{@name}" unless connected?
      raise "Interface not writable: #{@name}" unless write_allowed?

      @write_count += 1
      @write_queue << packet.clone
      @write_condition_variable.broadcast
    end

    # @param data [String] Data to write to all clients connected to the
    #   write port.
    def write_raw(data)
      raise "Interface not connected for write_raw: #{@name}" unless connected?
      raise "Interface not write-rawable: #{@name}" unless write_raw_allowed?

      @write_raw_queue << data
      @write_raw_condition_variable.broadcast
      return data
    end

    # @return [Integer] The number of packets waiting on the read queue
    def read_queue_size
      @read_queue ? @read_queue.size : 0
    end

    # @return [Integer] The number of packets waiting on the write queue
    def write_queue_size
      @write_queue ? @write_queue.size : 0
    end

    # @return [Integer] The number of connected clients
    def num_clients
      interfaces = []
      @write_interface_infos.each { |wii| interfaces << wii.interface }
      @read_interface_infos.each { |rii| interfaces << rii.interface }
      interfaces.uniq.length
    end

    # Start raw logging for this interface
    def start_raw_logging
      @raw_logging_enabled = true
      change_raw_logging(:start)
    end

    # Stop raw logging for this interface
    def stop_raw_logging
      @raw_logging_enabled = false
      change_raw_logging(:stop)
    end

    # Supported Options
    # LISTEN_ADDRESS - Ip address of the interface to accept connections on - Default: 0.0.0.0
    # AUTO_SYSTEM_META - Automatically send SYSTEM META on connect - Default false
    # (see Interface#set_option)
    def set_option(option_name, option_values)
      super(option_name, option_values)
      case option_name.upcase
      when 'LISTEN_ADDRESS'
        @listen_address = option_values[0]
      when 'AUTO_SYSTEM_META'
        @auto_system_meta = ConfigParser.handle_true_false(option_values[0])
      end
    end

    protected

    def shutdown_interfaces(interface_infos)
      @connection_mutex.synchronize do
        interface_infos.each do |interface_info|
          interface_info.interface.disconnect
          interface_info.interface.raw_logger_pair.stop if interface_info.interface.raw_logger_pair
        end
        interface_infos.clear
      end
    end

    def change_raw_logging(method)
      if @raw_logger_pair
        @write_interface_infos.each do |interface_info|
          interface_info.interface.raw_logger_pair.public_send(method) if interface_info.interface.raw_logger_pair
        end
        @read_interface_infos.each do |interface_info|
          interface_info.interface.raw_logger_pair.public_send(method) if interface_info.interface.raw_logger_pair
        end
      end
    end

    def start_listen_thread(port, listen_write = false, listen_read = false)
      # Create a socket to accept connections from clients
      addr = Socket.pack_sockaddr_in(port, @listen_address)
      if RUBY_ENGINE == 'ruby'
        listen_socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
        listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows?
        begin
          listen_socket.bind(addr)
        rescue Errno::EADDRINUSE
          raise "Error binding to port #{port}.\n" +
                "Either another application is using this port\n" +
                "or the operating system is being slow cleaning up.\n" +
                "Make sure all sockets/streams are closed in all applications,\n" +
                "wait 1 minute and try again."
        end

        listen_socket.listen(5)
      else
        listen_socket = ServerSocket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
        listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows?
        begin
          listen_socket.bind(addr, 5)
        rescue Errno::EADDRINUSE
          raise "Error binding to port #{port}.\n" +
                "Either another application is using this port\n" +
                "or the operating system is being slow cleaning up.\n" +
                "Make sure all sockets/streams are closed in all applications,\n" +
                "wait 1 minute and try again."
        end
      end
      @listen_sockets << listen_socket
      @listen_threads << Thread.new do
        thread_reader, thread_writer = IO.pipe
        @listen_pipes << thread_writer
        loop do
          listen_thread_body(listen_socket, listen_write, listen_read, thread_reader)
          break if @cancel_threads
        end
      rescue => err
        Logger.error("#{@name}: Tcpip server listen thread unexpectedly died")
        Logger.error(err.formatted)
      end
    end

    def listen_thread_body(listen_socket, listen_write, listen_read, thread_reader)
      begin
        socket, address = listen_socket.accept_nonblock
      rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK
        read_ready, _ = IO.select([listen_socket, thread_reader])
        if read_ready && read_ready.include?(thread_reader)
          return
        else
          retry
        end
      end

      port, host_ip = Socket.unpack_sockaddr_in(address)
      hostname = ''
      hostname = Socket.lookup_hostname_from_ip(host_ip)
      # if System.instance.acl
      #   addr = ["AF_INET", 10, "lc630", host_ip.to_s]
      #   if not System.instance.acl.allow_addr?(addr)
      #     # Reject connection
      #     Cosmos.close_socket(socket)
      #     Logger.info "#{@name}: Tcpip server rejected connection from #{hostname}(#{host_ip}):#{port}"
      #     return
      #   end
      # end

      # Configure TCP_NODELAY option
      socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

      # Accept Connection
      write_socket = nil
      read_socket = nil
      write_socket = socket if listen_write
      read_socket = socket if listen_read
      stream = TcpipSocketStream.new(write_socket, read_socket, @write_timeout, @read_timeout)

      interface = StreamInterface.new
      interface.target_names = @target_names
      if @raw_logger_pair
        interface.raw_logger_pair = @raw_logger_pair.clone
        interface.raw_logger_pair.start if @raw_logging_enabled
      end
      @protocol_info.each do |protocol_class, protocol_args, read_write|
        interface.add_protocol(protocol_class, protocol_args, read_write)
      end
      interface.stream = stream
      interface.connect

      if listen_write
        if @auto_system_meta
          meta_packet = System.telemetry.packet('SYSTEM', 'META').clone
          interface.write(meta_packet)
        end

        @write_connection_callback.call(interface) if @write_connection_callback
        @connection_mutex.synchronize do
          @write_interface_infos << InterfaceInfo.new(interface, hostname, host_ip, port)
        end
      end
      if listen_read
        @read_connection_callback.call(interface) if @read_connection_callback
        @connection_mutex.synchronize do
          @read_interface_infos << InterfaceInfo.new(interface, hostname, host_ip, port)
        end
        start_read_thread(@read_interface_infos[-1])
      end
      Logger.info "#{@name}: Tcpip server accepted connection from #{hostname}(#{host_ip}):#{port}"
    end

    def start_read_thread(interface_info)
      @read_threads << Thread.new do
        index_to_delete = nil
        begin
          begin
            read_thread_body(interface_info.interface)
          rescue Exception => err
            Logger.error "#{@name}: Tcpip server read thread unexpectedly died"
            Logger.error err.formatted
          end
          Logger.info "#{@name}: Tcpip server lost read connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}"
          @read_threads.delete(Thread.current)

          index_to_delete = nil
          @connection_mutex.synchronize do
            index = 0
            @read_interface_infos.each do |read_interface_info|
              if interface_info.interface == read_interface_info.interface
                index_to_delete = index
                read_interface_info.interface.disconnect
                read_interface_info.interface.raw_logger_pair.stop if read_interface_info.interface.raw_logger_pair
                break
              end
              index += 1
            end
          ensure
            if index_to_delete
              @read_interface_infos.delete_at(index_to_delete)
            end
          end
        rescue Exception => err
          Logger.error "#{@name}: Tcpip server read thread unexpectedly died"
          Logger.error err.formatted
        end
      end
    end

    def write_thread_body
      # Retrieve the next packet to be sent out to clients
      # Handles disconnected clients even when packets aren't flowing
      packet = nil

      loop do
        break if @cancel_threads

        begin
          packet = @write_queue.pop(true) # non_block to raise ThreadError
          break
        rescue ThreadError
          check_for_dead_clients()
        end
      end

      packet = write_thread_hook(packet)
      write_to_clients(:write, packet) if packet
    end

    def write_raw_thread_body
      # Retrieve the next data to be sent out to clients
      data = nil

      loop do
        break if @cancel_threads

        begin
          data = @write_raw_queue.pop(true) # non_block to raise ThreadError
          break
        rescue ThreadError
          # Sleep until we receive data or for 100ms
          @write_raw_mutex.synchronize do
            @write_raw_condition_variable.wait(@write_raw_mutex, 0.1)
          end
        end
      end

      data = write_raw_thread_hook(data)
      write_to_clients(:write_raw, data) if data
    end

    def interface_disconnect(interface_info)
      Logger.info "#{@name}: Tcpip server lost write connection to "\
                           "#{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}"
      interface_info.interface.disconnect
      interface_info.interface.raw_logger_pair.stop if interface_info.interface.raw_logger_pair
    end

    def write_thread_hook(packet)
      packet # By default just return the packet
    end

    def write_raw_thread_hook(data)
      data # By default just return the data
    end

    def read_thread_body(interface)
      thread_bytes_read = 0
      loop do
        packet = interface.read
        interface_bytes_read = interface.bytes_read
        if interface_bytes_read != thread_bytes_read
          diff = interface_bytes_read - thread_bytes_read
          @bytes_read += diff # This would be better if mutex protected, but not that important for telemetry
          thread_bytes_read = interface_bytes_read
        end
        return if !packet || @cancel_threads

        packet = read_thread_hook(packet) # Do work on received packet
        @read_raw_data_time = interface.read_raw_data_time
        @read_raw_data = interface.read_raw_data
        @read_queue << packet.clone
      end
    end

    # @return [Packet] Return the packet
    def read_thread_hook(packet)
      packet
    end

    def check_for_dead_clients
      indexes_to_delete = []
      index = 0

      @connection_mutex.synchronize do
        @write_interface_infos.each do |interface_info|
          if @write_port != @read_port
            # Socket should return EWOULDBLOCK if it is still cleanly connected
            interface_info.interface.stream.write_socket.recvfrom_nonblock(10)
          elsif !interface_info.interface.stream.write_socket.closed?
            # Let read thread detect disconnect
            next
          end
          # Client has disconnected (or is invalidly sending data on the socket)
          Logger.info "#{@name}: Tcpip server lost write connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}"
          interface_info.interface.disconnect
          interface_info.interface.raw_logger_pair.stop if interface_info.interface.raw_logger_pair
          indexes_to_delete.unshift(index) # Put later indexes at front of array
        rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError
          # Client has disconnected
          Logger.info "#{@name}: Tcpip server lost write connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}"
          interface_info.interface.disconnect
          interface_info.interface.raw_logger_pair.stop if interface_info.interface.raw_logger_pair
          indexes_to_delete.unshift(index) # Put later indexes at front of array
        rescue Errno::EWOULDBLOCK
          # Client is still cleanly connected as far as we can tell without writing to the socket
        ensure
          index += 1
        end

        # Delete any dead sockets
        indexes_to_delete.each do |index_to_delete|
          @write_interface_infos.delete_at(index_to_delete)
        end
      end # connection_mutex.synchronize

      # Sleep until we receive a packet or for 100ms
      @write_mutex.synchronize do
        @write_condition_variable.wait(@write_mutex, 0.1)
      end
    end

    def write_to_clients(method, packet_or_data)
      @connection_mutex.synchronize do
        # Send data to each client - On error drop the client
        indexes_to_delete = []
        index = 0
        @write_interface_infos.each do |interface_info|
          need_disconnect = false
          begin
            interface_bytes_written = interface_info.interface.bytes_written
            interface_info.interface.public_send(method, packet_or_data)
            diff = interface_info.interface.bytes_written - interface_bytes_written
            @written_raw_data_time = interface_info.interface.written_raw_data_time
            @written_raw_data = interface_info.interface.written_raw_data
            @bytes_written += diff
          rescue Errno::EPIPE, Errno::ECONNABORTED, IOError, Errno::ECONNRESET
            # Client has normally disconnected
            need_disconnect = true
          rescue Exception => err
            if err.message != "Stream not connected for write_raw"
              Logger.error "#{@name}: Error sending to client: #{err.class} #{err.message}"
            end
            need_disconnect = true
          end

          if need_disconnect
            Logger.info "#{@name}: Tcpip server lost write connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}"
            interface_info.interface.disconnect
            interface_info.interface.raw_logger_pair.stop if interface_info.interface.raw_logger_pair
            indexes_to_delete.unshift(index) # Put later indexes at front of array
          end
          index += 1
        end

        # Delete any dead sockets
        indexes_to_delete.each do |index_to_delete|
          @write_interface_infos.delete_at(index_to_delete)
        end
      end # connection_mutex.synchronize
    end
  end
end