firehoseio/firehose

View on GitHub
lib/firehose/rack/consumer/http_long_poll.rb

Summary

Maintainability
A
2 hrs
Test Coverage
C
71%
require 'json'

module Firehose
  module Rack
    class Consumer
      class HttpLongPoll
        # How long should we wait before closing out the consuming clients web connection
        # for long polling? Most browsers timeout after a connection has been idle for 30s.
        TIMEOUT = 20

        attr_accessor :timeout

        def initialize(timeout=TIMEOUT)
          @timeout = timeout
          yield self if block_given?
        end

        def call(env)
          if Consumer.multiplexing_request?(env)
            MultiplexingHandler.new(@timeout).call(env)
          else
            DefaultHandler.new(@timeout).call(env)
          end
        end

        class Handler < Firehose::Rack::Consumer::BasicHandler
          include Firehose::Rack::Helpers

          def initialize(timeout=TIMEOUT)
            @timeout = timeout
            yield self if block_given?
          end

          def call(env)
            request = request(env)
            method  = request.request_method

            case method
            # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response,
            # close down the requeust, and the client then reconnects.
            when "GET"
              handle_request(request, env)
              return ASYNC_RESPONSE
            # we use post messages for http long poll multiplexing
            when "POST"
              if Consumer.multiplexing_request?(env)
                handle_request(request, env)
                return ASYNC_RESPONSE
              end
            end

            Firehose.logger.debug "HTTP #{method} not supported"
            response(405, "#{method} not supported.", "Allow" => "GET")
          end

          private

          # If the request is a CORS request, return those headers, otherwise don't worry 'bout it
          def response_headers(env)
            cors_origin(env) ? cors_headers(env) : {}
          end

          def cors_origin(env)
            env['HTTP_ORIGIN']
          end

          def cors_headers(env)
            {'Access-Control-Allow-Origin' => cors_origin(env)}
          end

          def request(env)
            env['parsed_request'] ||= ::Rack::Request.new(env)
          end

          def async_callback(env, code, message = "", headers = nil)
            resp_headers = response_headers(env)

            if headers
              resp_headers.merge!(headers)
            end

            if cb = env["async.callback"]
              cb.call response(code, message, resp_headers)
            else
              Firehose.logger.error "async.callback not set for response: #{message.inspect}"
            end
          end

          def respond_async(channel, last_sequence, params, env)
            EM.next_tick do
              if last_sequence < 0
                async_callback env, 400, "The last_message_sequence parameter may not be less than zero"
              else
                begin
                  chan_sub = Server::ChannelSubscription.new(
                    channel,
                    env,
                    params: params,
                    sequence: last_sequence,
                    timeout: @timeout
                  )

                  chan_sub.next_messages.callback do |messages|
                    # TODO: Can we send all of these messages down in one request? Sending one message per
                    # request is slow and inefficient. If we change the protocol (3.0?) we could batch the
                    # messages and send them all down the pipe, then close the conneciton.
                    message = messages.first
                    async_callback env, 200, wrap_frame(channel, message)
                  end.errback do |e|
                    if e == :timeout
                      Firehose::Server.metrics.timeout!(:http_subscribe_multiplexed, channel)
                      async_callback env, 204
                    else
                      Firehose::Server.metrics.error!(:http_subscribe_multiplexed, channel)
                      Firehose.logger.error "Unexpected error when trying to GET last_sequence #{last_sequence} for path #{channel}: #{e.inspect}"
                      async_callback env, 500, "Unexpected error"
                    end
                  end
                rescue Server::ChannelSubscription::Failed => e
                  Firehose::Server.metrics.error!(:http_subscribe_multiplexed_failed, channel)
                  Firehose.logger.info "Subscription failed: #{e.message}"
                  async_callback env,
                                 400,
                                 JSON.generate(error: "Subscription failed",
                                               reason: e.message)
                end
              end
            end
          end
        end

        class DefaultHandler < Handler
          def wrap_frame(channel, message)
            JSON.generate message: message.payload,
                          last_sequence: message.sequence
          end

          def log_request(path, last_sequence, env)
            Firehose.logger.debug "HTTP GET with last_sequence #{last_sequence} for path #{path} with query #{env["QUERY_STRING"].inspect}"
          end

          def handle_request(request, env)
            # Get the Last Message Sequence from the query string.
            # Ideally we'd use an HTTP header, but android devices don't let us
            # set any HTTP headers for CORS requests.
            params = request.params
            last_sequence = params.delete('last_message_sequence').to_i
            channel       = request.path

            Firehose::Server.metrics.channel_subscribed!(channel)

            log_request   channel, last_sequence, env
            respond_async channel, last_sequence, params, env
          end
        end

        class MultiplexingHandler < Handler
          def wrap_frame(channel, message)
            JSON.generate channel: channel,
                          message: message.payload,
                          last_sequence: message.sequence
          end

          def log_request(request, subscriptions, env)
            if request.post?
              Firehose.logger.debug "HTTP multiplexing POST, subscribing #{subscriptions.inspect}"
            else
              Firehose.logger.debug "HTTP multiplexing GET with query #{env["QUERY_STRING"].inspect}"
            end
          end

          def handle_request(request, env)
            subscriptions = Consumer.multiplex_subscriptions(request)
            if subscriptions.empty?
              Firehose::Server.metrics.error!(:http_subscribe_multiplexed_empty)
              Firehose.logger.warn "Client tried to subscribe multiplexed via HTTP without any channel subscriptions."
              async_callback env,
                             400,
                             JSON.generate(error: "Subscription failed",
                                           reason: "No subscription data given - can't subscribe to nothing")
            else
              log_request request, subscriptions, env
              channels = subscriptions.map{|s| s[:channel]}
              Firehose::Server.metrics.channels_subscribed_multiplexed_long_polling!(channels)
              subscriptions.each do |sub|
                respond_async(sub[:channel], sub[:last_message_sequence] || sub[:message_sequence], sub[:params], env)
              end
            end
          end
        end
      end
    end
  end
end