hirura/hrr_rb_ssh

View on GitHub
lib/hrr_rb_ssh/connection/channel.rb

Summary

Maintainability
F
6 days
Test Coverage
require 'hrr_rb_ssh/connection/channel/channel_type'

module HrrRbSsh
  class Connection
    class Channel
      include Loggable

      INITIAL_WINDOW_SIZE = 100000
      MAXIMUM_PACKET_SIZE = 100000

      attr_reader \
        :channel_type,
        :local_channel,
        :remote_channel,
        :local_window_size,
        :local_maximum_packet_size,
        :remote_window_size,
        :remote_maximum_packet_size,
        :receive_message_queue,
        :exit_status

      def initialize connection, message, socket=nil, logger: nil
        self.logger = logger

        @connection = connection

        @channel_type = message[:'channel type']
        @local_channel  = connection.assign_channel
        @remote_channel = message[:'sender channel']
        @local_window_size          = INITIAL_WINDOW_SIZE
        @local_maximum_packet_size  = MAXIMUM_PACKET_SIZE
        @remote_window_size         = message[:'initial window size']
        @remote_maximum_packet_size = message[:'maximum packet size']

        @channel_type_instance = ChannelType[@channel_type].new connection, self, message, socket, logger: logger

        @receive_message_queue = Queue.new
        @receive_data_queue = Queue.new
        @receive_extended_data_queue = Queue.new

        @r_io_in,  @w_io_in  = IO.pipe
        @r_io_out, @w_io_out = IO.pipe
        @r_io_err, @w_io_err = IO.pipe

        @channel_closing_monitor = Monitor.new

        @closed = nil
        @exit_status = nil
      end

      def set_remote_parameters message
        @remote_channel = message[:'sender channel']
        @remote_window_size = message[:'initial window size']
        @remote_maximum_packet_size = message[:'maximum packet size']
      end

      def io
        case @connection.mode
        when Mode::SERVER
          [@r_io_in, @w_io_out, @w_io_err]
        when Mode::CLIENT
          [@w_io_in, @r_io_out, @r_io_err]
        end
      end

      def start
        @channel_loop_thread = channel_loop_thread
        case @connection.mode
        when Mode::SERVER
          @out_sender_thread   = out_sender_thread
          @err_sender_thread   = err_sender_thread
          @receiver_thread     = receiver_thread
          @channel_type_instance.start
        when Mode::CLIENT
          @out_receiver_thread = out_receiver_thread
          @err_receiver_thread = err_receiver_thread
          @sender_thread       = sender_thread
          @channel_type_instance.start
        end
        @closed = false
        log_debug { "in start: #{@waiting_thread.inspect}" }
        @waiting_thread.wakeup if @waiting_thread
      end

      def wait_until_started
        @waiting_thread = Thread.current
        log_debug { "in wait_until_started: #{@waiting_thread.inspect}" }
        Thread.stop
      end

      def wait_until_senders_closed
        [
          @out_sender_thread,
          @err_sender_thread,
        ].each{ |t|
          begin
            t.join if t.instance_of? Thread
          rescue => e
            log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
          end
        }
      end

      def close from=:outside, exitstatus=0
        @channel_closing_monitor.synchronize {
          return if @closed
          log_info { "close channel" }
          @closed = true
        }
        unless from == :channel_type_instance
          @channel_type_instance.close
        end
        @receive_message_queue.close
        begin
          if from == :channel_type_instance
            send_channel_eof
            case exitstatus
            when Integer
              send_channel_request_exit_status exitstatus
            else
              log_warn { "skip sending exit-status because exitstatus is not an instance of Integer" }
            end
          elsif from == :sender_thread
            send_channel_eof
          end
          send_channel_close
        rescue Error::ClosedConnection => e
          Thread.pass
        rescue => e
          log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
        end
        log_info { "channel closed" }
      end

      def wait_until_closed
        [
          @out_sender_thread,
          @err_sender_thread,
          @receiver_thread,
          @out_receiver_thread,
          @err_receiver_thread,
          @sender_thread,
          @channel_loop_thread
        ].each{ |t|
          begin
            t.join if t.instance_of? Thread
          rescue => e
            log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
          end
        }
      end

      def closed?
        @closed
      end

      def channel_loop_thread
        Thread.start do
          log_info { "start channel loop thread" }
          begin
            loop do
              begin
                message = @receive_message_queue.deq
                if message.nil? && @receive_message_queue.closed?
                  break
                end
                case message[:'message number']
                when Messages::SSH_MSG_CHANNEL_EOF::VALUE
                  @receive_data_queue.close
                  @receive_extended_data_queue.close
                when Messages::SSH_MSG_CHANNEL_REQUEST::VALUE
                  log_info { "received channel request: #{message[:'request type']}" }
                  case @connection.mode
                  when Mode::SERVER
                    begin
                      @channel_type_instance.request message
                    rescue => e
                      log_warn { "request failed: #{e.message}" }
                      send_channel_failure if message[:'want reply']
                    else
                      send_channel_success if message[:'want reply']
                    end
                  when Mode::CLIENT
                    case message[:'request type']
                    when "exit-status"
                      log_info { "exit status: #{message[:'exit status']}" }
                      @exit_status = message[:'exit status'].to_i
                    end
                  end
                when Messages::SSH_MSG_CHANNEL_DATA::VALUE
                  log_info { "received channel data" }
                  local_channel = message[:'recipient channel']
                  @receive_data_queue.enq message[:'data']
                when Messages::SSH_MSG_CHANNEL_EXTENDED_DATA::VALUE
                  log_info { "received channel extended data" }
                  local_channel = message[:'recipient channel']
                  @receive_extended_data_queue.enq message[:'data']
                when Messages::SSH_MSG_CHANNEL_WINDOW_ADJUST::VALUE
                  log_info { "received channel window adjust" }
                  @remote_window_size = [@remote_window_size + message[:'bytes to add'], 0xffff_ffff].min
                else
                  log_warn { "received unsupported message: #{message.inspect}" }
                end
              rescue => e
                log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
                close from=:channel_loop_thread
                break
              end
            end
          ensure
            log_info { "closing channel loop thread" }
            @receive_data_queue.close
            @receive_extended_data_queue.close
          end
          log_info { "channel loop thread closed" }
        end
      end

      def out_sender_thread
        Thread.start {
          log_info { "start out sender thread" }
          loop do
            if @r_io_out.closed?
              log_info { "closing out sender thread" }
              break
            end
            begin
              data = @r_io_out.readpartial(10240)
              sendable_size = [data.size, @remote_window_size].min
              sending_data = data[0, sendable_size]
              send_channel_data sending_data if sendable_size > 0
              @remote_window_size -= sendable_size
            rescue EOFError, IOError => e
              @r_io_out.close rescue nil
            rescue => e
              log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
              @r_io_out.close rescue nil
              close
            end
          end
          log_info { "out sender thread closed" }
        }
      end

      def err_sender_thread
        Thread.start {
          log_info { "start err sender thread" }
          loop do
            if @r_io_err.closed?
              log_info { "closing err sender thread" }
              break
            end
            begin
              data = @r_io_err.readpartial(10240)
              sendable_size = [data.size, @remote_window_size].min
              sending_data = data[0, sendable_size]
              send_channel_extended_data sending_data if sendable_size > 0
              @remote_window_size -= sendable_size
            rescue EOFError, IOError => e
              @r_io_err.close rescue nil
            rescue => e
              log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
              @r_io_err.close rescue nil
              close
            end
          end
          log_info { "err sender thread closed" }
        }
      end

      def receiver_thread
        Thread.start {
          log_info { "start receiver thread" }
          loop do
            begin
              data = @receive_data_queue.deq
              if data.nil? && @receive_data_queue.closed?
                log_info { "closing receiver thread" }
                log_info { "closing w_io_in" }
                @w_io_in.close
                log_info { "w_io_in closed" }
                break
              end
              @w_io_in.write data
              @local_window_size -= data.size
              if @local_window_size < INITIAL_WINDOW_SIZE/2
                log_info { "send channel window adjust" }
                send_channel_window_adjust
                @local_window_size += INITIAL_WINDOW_SIZE
              end
            rescue Errno::EPIPE, IOError => e
              close
              break
            rescue => e
              log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
              close
              break
            end
          end
          log_info { "receiver thread closed" }
        }
      end

      def out_receiver_thread
        Thread.start {
          log_info { "start out receiver thread" }
          loop do
            begin
              data = @receive_data_queue.deq
              if data.nil? && @receive_data_queue.closed?
                log_info { "closing out receiver thread" }
                log_info { "closing w_io_out" }
                @w_io_out.close
                log_info { "w_io_out closed" }
                break
              end
              @w_io_out.write data
              @local_window_size -= data.size
              if @local_window_size < INITIAL_WINDOW_SIZE/2
                log_info { "send channel window adjust" }
                send_channel_window_adjust
                @local_window_size += INITIAL_WINDOW_SIZE
              end
            rescue Errno::EPIPE, IOError => e
              close
              break
            rescue => e
              log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
              close
              break
            end
          end
          log_info { "out receiver thread closed" }
        }
      end

      def err_receiver_thread
        Thread.start {
          log_info { "start err receiver thread" }
          loop do
            begin
              data = @receive_extended_data_queue.deq
              if data.nil? && @receive_extended_data_queue.closed?
                log_info { "closing err receiver thread" }
                log_info { "closing w_io_err" }
                @w_io_err.close
                log_info { "w_io_err closed" }
                break
              end
              @w_io_err.write data
              @local_window_size -= data.size
              if @local_window_size < INITIAL_WINDOW_SIZE/2
                log_info { "send channel window adjust" }
                send_channel_window_adjust
                @local_window_size += INITIAL_WINDOW_SIZE
              end
            rescue Error::EPIPE, IOError => e
              close
              break
            rescue => e
              log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
              close
              break
            end
          end
          log_info { "err receiver thread closed" }
        }
      end

      def sender_thread
        Thread.start {
          log_info { "start sender thread" }
          loop do
            if @r_io_in.closed?
              log_info { "closing sender thread" }
              break
            end
            begin
              data = @r_io_in.readpartial(10240)
              sendable_size = [data.size, @remote_window_size].min
              sending_data = data[0, sendable_size]
              send_channel_data sending_data if sendable_size > 0
              @remote_window_size -= sendable_size
            rescue EOFError, IOError => e
              @r_io_in.close rescue nil
            rescue => e
              log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
              @r_io_in.close rescue nil
            end
          end
          close from=:sender_thread
          log_info { "sender thread closed" }
        }
      end

      def send_channel_success
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_SUCCESS::VALUE,
          :'recipient channel' => @remote_channel,
        }
        payload = Messages::SSH_MSG_CHANNEL_SUCCESS.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_failure
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_FAILURE::VALUE,
          :'recipient channel' => @remote_channel,
        }
        payload = Messages::SSH_MSG_CHANNEL_FAILURE.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_window_adjust
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_WINDOW_ADJUST::VALUE,
          :'recipient channel' => @remote_channel,
          :'bytes to add'      => INITIAL_WINDOW_SIZE,
        }
        payload = Messages::SSH_MSG_CHANNEL_WINDOW_ADJUST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_data data
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_DATA::VALUE,
          :'recipient channel' => @remote_channel,
          :'data'              => data,
        }
        payload = Messages::SSH_MSG_CHANNEL_DATA.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_extended_data data, code=Messages::SSH_MSG_CHANNEL_EXTENDED_DATA::DataTypesCode::SSH_EXTENDED_DATA_STDERR
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_EXTENDED_DATA::VALUE,
          :'recipient channel' => @remote_channel,
          :'data type code'    => code,
          :'data'              => data,
        }
        payload = Messages::SSH_MSG_CHANNEL_EXTENDED_DATA.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_pty_req term_env_var_val, term_width_chars, term_height_rows, term_width_pixel, term_height_pixel, encoded_term_modes
        message = {
          :'message number'                  => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel'               => @remote_channel,
          :'request type'                    => "pty-req",
          :'want reply'                      => false,
          :'TERM environment variable value' => term_env_var_val,
          :'terminal width, characters'      => term_width_chars,
          :'terminal height, rows'           => term_height_rows,
          :'terminal width, pixels'          => term_width_pixel,
          :'terminal height, pixels'         => term_height_pixel,
          :'encoded terminal modes'          => encoded_term_modes,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_env variable_name, variable_value
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel' => @remote_channel,
          :'request type'      => "env",
          :'want reply'        => false,
          :'variable name'     => variable_name,
          :'variable value'    => variable_value,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_shell
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel' => @remote_channel,
          :'request type'      => "shell",
          :'want reply'        => false,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_exec command
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel' => @remote_channel,
          :'request type'      => "exec",
          :'want reply'        => false,
          :'command'           => command,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_subsystem subsystem_name
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel' => @remote_channel,
          :'request type'      => "subsystem",
          :'want reply'        => false,
          :'subsystem name'    => subsystem_name,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_window_change term_width_cols, term_height_rows, term_width_pixel, term_height_pixel
        message = {
          :'message number'          => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel'       => @remote_channel,
          :'request type'            => "window-change",
          :'want reply'              => false,
          :'terminal width, columns' => term_width_cols,
          :'terminal height, rows'   => term_height_rows,
          :'terminal width, pixels'  => term_width_pixel,
          :'terminal height, pixels' => term_height_pixel,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_signal signal_name
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel' => @remote_channel,
          :'request type'      => "signal",
          :'want reply'        => false,
          :'signal name'       => signal_name,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_request_exit_status exitstatus
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_REQUEST::VALUE,
          :'recipient channel' => @remote_channel,
          :'request type'      => "exit-status",
          :'want reply'        => false,
          :'exit status'       => exitstatus,
        }
        payload = Messages::SSH_MSG_CHANNEL_REQUEST.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_eof
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_EOF::VALUE,
          :'recipient channel' => @remote_channel,
        }
        payload = Messages::SSH_MSG_CHANNEL_EOF.new(logger: logger).encode message
        @connection.send payload
      end

      def send_channel_close
        message = {
          :'message number'    => Messages::SSH_MSG_CHANNEL_CLOSE::VALUE,
          :'recipient channel' => @remote_channel,
        }
        payload = Messages::SSH_MSG_CHANNEL_CLOSE.new(logger: logger).encode message
        @connection.send payload
      end
    end
  end
end