rhenium/plum

View on GitHub
lib/plum/connection.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen-string-literal: true

using Plum::BinaryString

module Plum
  class Connection
    include EventEmitter
    include FlowControl

    CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

    DEFAULT_SETTINGS = {
      header_table_size:      4096,     # octets
      enable_push:            1,        # 1: enabled, 0: disabled
      max_concurrent_streams: 1 << 30,  # (1 << 31) / 2
      initial_window_size:    65535,    # octets; <= 2 ** 31 - 1
      max_frame_size:         16384,    # octets; <= 2 ** 24 - 1
      max_header_list_size:   (1 << 32) - 1 # Fixnum
    }.freeze

    attr_reader :hpack_encoder, :hpack_decoder
    attr_reader :local_settings, :remote_settings
    attr_reader :state, :streams

    def initialize(writer, local_settings = {})
      @state = :open
      @writer = writer
      @local_settings = Hash.new { |hash, key| DEFAULT_SETTINGS[key] }.merge!(local_settings)
      @remote_settings = Hash.new { |hash, key| DEFAULT_SETTINGS[key] }
      @buffer = "".b
      @streams = {}
      @hpack_decoder = HPACK::Decoder.new(@local_settings[:header_table_size])
      @hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size])
      initialize_flow_control(send: @remote_settings[:initial_window_size],
                              recv: @local_settings[:initial_window_size])
      @max_stream_ids = [0, -1] # [even, odd]
    end

    # Emits :close event. Doesn't actually close socket.
    def close
      return if @state == :closed
      @state = :closed
      # TODO: server MAY wait streams
      callback(:close)
    end

    # Receives the specified data and process.
    # @param new_data [String] The data received from the peer.
    def receive(new_data)
      return if @state == :closed
      return if new_data.empty?
      @buffer << new_data
      consume_buffer
    rescue RemoteConnectionError => e
      callback(:connection_error, e)
      goaway(e.http2_error_type)
      close
    end
    alias << receive

    # Returns a Stream object with the specified ID.
    # @param stream_id [Integer] the stream id
    # @return [Stream] the stream
    def stream(stream_id, update_max_id = true)
      raise ArgumentError, "stream_id can't be 0" if stream_id == 0

      stream = @streams[stream_id]
      if stream
        if stream.state == :idle && stream_id < @max_stream_ids[stream_id % 2]
          stream.set_state(:closed_implicitly)
        end
      elsif stream_id > @max_stream_ids[stream_id % 2]
        @max_stream_ids[stream_id % 2] = stream_id if update_max_id
        stream = Stream.new(self, stream_id, state: :idle)
        callback(:stream, stream)
        @streams[stream_id] = stream
      else
        stream = Stream.new(self, stream_id, state: :closed_implicitly)
        callback(:stream, stream)
      end

      stream
    end

    # Sends local settings to the peer.
    # @param new_settings [Hash<Symbol, Integer>]
    def settings(**new_settings)
      send_immediately Frame::Settings.new(**new_settings)

      old_settings = @local_settings.dup
      @local_settings.merge!(new_settings)

      @hpack_decoder.limit = @local_settings[:header_table_size]
      update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size])
    end

    # Sends a PING frame to the peer.
    # @param data [String] Must be 8 octets.
    # @raise [ArgumentError] If the data is not 8 octets.
    def ping(data = "plum\x00\x00\x00\x00")
      send_immediately Frame::Ping.new(data)
    end

    # Sends GOAWAY frame to the peer and closes the connection.
    # @param error_type [Symbol] The error type to be contained in the GOAWAY frame.
    def goaway(error_type = :no_error, message = "")
      last_id = @max_stream_ids.max
      send_immediately Frame::Goaway.new(last_id, error_type, message)
    end

    # Returns whether peer enables server push or not
    def push_enabled?
      @remote_settings[:enable_push] == 1
    end

    private
    def consume_buffer
      while frame = Frame.parse!(@buffer)
        callback(:frame, frame)
        receive_frame(frame)
      end
    end

    def send_immediately(frame)
      callback(:send_frame, frame)

      if frame.length <= @remote_settings[:max_frame_size]
        @writer.call(frame.assemble)
      else
        frame.split(@remote_settings[:max_frame_size]) { |splitted|
          @writer.call(splitted.assemble)
        }
      end
    end

    def validate_received_frame(frame)
      if @state == :waiting_settings && !(Frame::Settings === frame)
        raise RemoteConnectionError.new(:protocol_error)
      end

      if @state == :waiting_continuation
        if !(Frame::Continuation === frame) || frame.stream_id != @continuation_id
          raise RemoteConnectionError.new(:protocol_error)
        end
        if frame.end_headers?
          @state = :open
        end
      end

      if Frame::Headers === frame || Frame::PushPromise === frame
        if !frame.end_headers?
          @state = :waiting_continuation
          @continuation_id = frame.stream_id
        end
      end
    end

    def receive_frame(frame)
      validate_received_frame(frame)
      consume_recv_window(frame)

      if frame.stream_id == 0
        receive_control_frame(frame)
      else
        stream(frame.stream_id, Frame::Headers === frame).receive_frame(frame)
      end
    end

    def receive_control_frame(frame)
      if frame.length > @local_settings[:max_frame_size]
        raise RemoteConnectionError.new(:frame_size_error)
      end

      case frame
      when Frame::Settings then     receive_settings(frame)
      when Frame::WindowUpdate then receive_window_update(frame)
      when Frame::Ping then         receive_ping(frame)
      when Frame::Goaway then       receive_goaway(frame)
      when Frame::Data, Frame::Headers, Frame::Priority, Frame::RstStream, Frame::PushPromise, Frame::Continuation
        raise Plum::RemoteConnectionError.new(:protocol_error)
      else
        # MUST ignore unknown frame type.
      end
    end

    def receive_settings(frame, send_ack: true)
      if frame.ack?
        raise RemoteConnectionError.new(:frame_size_error) if frame.length != 0
        callback(:settings_ack)
        return
      else
        raise RemoteConnectionError.new(:frame_size_error) if frame.length % 6 != 0
      end

      old_remote_settings = @remote_settings.dup
      @remote_settings.merge!(frame.parse_settings)
      apply_remote_settings(old_remote_settings)

      callback(:remote_settings, @remote_settings, old_remote_settings)

      send_immediately Frame::Settings.ack if send_ack

      if @state == :waiting_settings
        @state = :open
        callback(:negotiated)
      end
    end

    def apply_remote_settings(old_remote_settings)
      @hpack_encoder.limit = @remote_settings[:header_table_size]
      update_send_initial_window_size(@remote_settings[:initial_window_size] - old_remote_settings[:initial_window_size])
    end

    def receive_ping(frame)
      raise Plum::RemoteConnectionError.new(:frame_size_error) if frame.length != 8

      if frame.ack?
        callback(:ping_ack)
      else
        opaque_data = frame.payload
        callback(:ping, opaque_data)
        send_immediately Frame::Ping.new(:ack, opaque_data)
      end
    end

    def receive_goaway(frame)
      callback(:goaway, frame)
      goaway
      close

      # TODO: how handle it?
      # last_id = frame.payload.uint32(0)
      error_code = frame.payload.uint32(4)
      message = frame.payload.byteslice(8, frame.length - 8)
      if error_code > 0
        raise LocalConnectionError.new(HTTPError::ERROR_CODES.key(error_code), message)
      end
    end
  end
end