rhenium/plum

View on GitHub
lib/plum/stream.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen-string-literal: true

using Plum::BinaryString

module Plum
  class Stream
    include EventEmitter
    include FlowControl

    attr_reader :id, :state, :connection
    attr_reader :weight, :exclusive
    attr_accessor :parent
    # The child (depending on this stream) streams.
    attr_reader :children

    def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false)
      @connection = con
      @id = id
      @state = state
      @continuation = []
      @children = Set.new

      initialize_flow_control(send: @connection.remote_settings[:initial_window_size],
                              recv: @connection.local_settings[:initial_window_size])
      update_dependency(weight: weight, parent: parent, exclusive: exclusive)
    end

    # Processes received frames for this stream. Internal use.
    # @private
    def receive_frame(frame)
      validate_received_frame(frame)
      consume_recv_window(frame)

      case frame
      when Frame::Data then         receive_data(frame)
      when Frame::Headers then      receive_headers(frame)
      when Frame::Priority then     receive_priority(frame)
      when Frame::RstStream then    receive_rst_stream(frame)
      when Frame::WindowUpdate then receive_window_update(frame)
      when Frame::Continuation then receive_continuation(frame)
      when Frame::PushPromise then  receive_push_promise(frame)
      when Frame::Ping, Frame::Goaway, Frame::Settings
        raise RemoteConnectionError.new(:protocol_error) # stream_id MUST be 0x00
      else
        # MUST ignore unknown frame
      end
    rescue RemoteStreamError => e
      callback(:stream_error, e)
      send_immediately Frame::RstStream.new(id, e.http2_error_type)
      close
    end

    # Closes this stream. Sends RST_STREAM frame to the peer.
    def close
      @state = :closed
      callback(:close)
    end

    # @api private
    def set_state(state)
      @state = state
    end

    # @api private
    def update_dependency(weight: nil, parent: nil, exclusive: nil)
      raise RemoteStreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self

      if weight
        @weight = weight
      end

      (@parent = parent)&.children&.add(self)

      if exclusive != nil
        @exclusive = exclusive
        if @parent && exclusive
          @parent.children.to_a.each do |child|
            next if child == self
            @parent.children.delete(child)
            child.parent = self
            @children << child
          end
        end
      end
    end

    # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream.
    # @param headers [Enumerable<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'.
    # @return [Stream] The stream to send push response.
    def promise(headers)
      stream = @connection.reserve_stream(weight: self.weight + 1, parent: self)
      encoded = @connection.hpack_encoder.encode(headers)
      frame = Frame::PushPromise.new(id, stream.id, encoded, end_headers: true)
      send frame
      stream
    end

    # Sends response headers. If the encoded frame is larger than MAX_FRAME_SIZE, the headers will be splitted into HEADERS frame and CONTINUATION frame(s).
    # @param headers [Enumerable<String, String>] The response headers.
    # @param end_stream [Boolean] Set END_STREAM flag or not.
    def send_headers(headers, end_stream:)
      encoded = @connection.hpack_encoder.encode(headers)
      frame = Frame::Headers.new(id, encoded, end_headers: true, end_stream: end_stream)
      send frame
      @state = :half_closed_local if end_stream
    end

    # Sends DATA frame. If the data is larger than MAX_FRAME_SIZE, DATA frame will be splitted.
    # @param data [String, IO] The data to send.
    # @param end_stream [Boolean] Set END_STREAM flag or not.
    def send_data(data = "", end_stream: true)
      max = @connection.remote_settings[:max_frame_size]
      if data.is_a?(IO)
        until data.eof?
          fragment = data.readpartial(max)
          send Frame::Data.new(id, fragment, end_stream: end_stream && data.eof?)
        end
      else
        send Frame::Data.new(id, data, end_stream: end_stream)
      end
      @state = :half_closed_local if end_stream
    end

    private
    def send_immediately(frame)
      @connection.send(frame)
    end

    def validate_received_frame(frame)
      if frame.length > @connection.local_settings[:max_frame_size]
        case frame
        when Frame::Headers, Frame::PushPromise, Frame::Continuation
          raise RemoteConnectionError.new(:frame_size_error)
        else
          raise RemoteStreamError.new(:frame_size_error)
        end
      end
    end

    def receive_end_stream
      callback(:end_stream)
      @state = :half_closed_remote
    end

    def receive_data(frame)
      if @state != :open && @state != :half_closed_local
        raise RemoteStreamError.new(:stream_closed)
      end

      if frame.padded?
        padding_length = frame.payload.uint8
        if padding_length >= frame.length
          raise RemoteConnectionError.new(:protocol_error, "padding is too long")
        end
        callback(:data, frame.payload.byteslice(1, frame.length - padding_length - 1))
      else
        callback(:data, frame.payload)
      end

      receive_end_stream if frame.end_stream?
    end

    def receive_complete_headers(frames)
      first = frames.shift
      payload = first.payload

      if first.padded?
        padding_length = payload.uint8
        payload = payload.byteslice(1, payload.bytesize - padding_length - 1)
      else
        padding_length = 0
        payload = payload.dup
      end

      if first.priority?
        receive_priority_payload(payload.byteshift(5))
      end

      if padding_length > payload.bytesize
        raise RemoteConnectionError.new(:protocol_error, "padding is too long")
      end

      frames.each do |frame|
        payload << frame.payload
      end

      begin
        decoded_headers = @connection.hpack_decoder.decode(payload)
      rescue
        raise RemoteConnectionError.new(:compression_error, $!)
      end

      callback(:headers, decoded_headers)

      receive_end_stream if first.end_stream?
    end

    def receive_headers(frame)
      if @state == :reserved_local
        raise RemoteConnectionError.new(:protocol_error)
      elsif @state == :half_closed_remote
        raise RemoteStreamError.new(:stream_closed)
      elsif @state == :closed
        raise RemoteConnectionError.new(:stream_closed)
      elsif @state == :closed_implicitly
        raise RemoteConnectionError.new(:protocol_error)
      elsif @state == :idle && self.id.even?
        raise RemoteConnectionError.new(:protocol_error)
      end

      @state = :open
      callback(:open)

      if frame.end_headers?
        receive_complete_headers([frame])
      else
        @continuation << frame
      end
    end

    def receive_push_promise(frame)
      raise NotImplementedError

      if promised_stream.state == :closed_implicitly
        # 5.1.1 An endpoint that receives an unexpected stream identifier MUST respond with a connection error of type PROTOCOL_ERROR.
        raise RemoteConnectionError.new(:protocol_error)
      elsif promised_id.odd?
        # 5.1.1 Streams initiated by the server MUST use even-numbered stream identifiers.
        raise RemoteConnectionError.new(:protocol_error)
      end
    end

    def receive_continuation(frame)
      # state error mustn't happen: server_connection validates
      @continuation << frame

      if frame.end_headers?
        receive_complete_headers(@continuation)
        @continuation.clear
      end
    end

    def receive_priority(frame)
      if frame.length != 5
        raise RemoteStreamError.new(:frame_size_error)
      end
      receive_priority_payload(frame.payload)
    end

    def receive_priority_payload(payload)
      esd = payload.uint32
      e = (esd >> 31) == 1
      dependency_id = esd & ~(1 << 31)
      weight = payload.uint8(4)

      update_dependency(weight: weight, parent: @connection.streams[dependency_id], exclusive: e)
    end

    def receive_rst_stream(frame)
      if frame.length != 4
        raise RemoteConnectionError.new(:frame_size_error)
      elsif @state == :idle
        raise RemoteConnectionError.new(:protocol_error)
      end
      @state = :closed # MUST NOT send RST_STREAM

      error_code = frame.payload.uint32
      callback(:rst_stream, HTTPError::ERROR_CODES.key(error_code))
    end

    # override EventEmitter
    def callback(name, *args)
      super(name, *args)
      @connection.callback(name, self, *args)
    end
  end
end