firehoseio/firehose

View on GitHub
lib/firehose/server/channel_subscription.rb

Summary

Maintainability
A
0 mins
Test Coverage
A
94%
module Firehose
  module Server
    # Connects to a specific channel on Redis and listens for messages to notify subscribers.
    class ChannelSubscription
      # Can be raised if a Subscription failed for any reason.
      class Failed < StandardError
      end

      attr_reader :channel_key, :params, :sequence, :timeout

      def self.redis
        @redis ||= Firehose::Server.redis.connection
      end

      def self.subscriber
        @subscriber ||= Server::Subscriber.new
      end

      class ClientInfo
        attr_reader :ip, :referer, :user_agent
        def initialize(env)
          @ip         = env["REMOTE_ADDR"]
          if realIp = env["HTTP_X_FORWARDED_FOR"]
            @ip = realIp
          end
          @referer    = env["HTTP_REFERER"]
          @user_agent = env["HTTP_USER_AGENT"]
        end

        def to_s
          "ip=#{@ip.inspect} referer=#{@referer.inspect} user_agent=#{@user_agent.inspect}"
        end
      end

      def initialize(channel_key, env, sequence: 0, params: {}, timeout: nil)
        @redis        = self.class.redis
        @subscriber   = self.class.subscriber
        @sequence     = sequence
        @timeout      = timeout
        @channel_key  = channel_key
        @client_info  = ClientInfo.new(env)
        @deferrable = EM::DefaultDeferrable.new
        @deferrable.errback {|e| EM.next_tick { raise e } unless [:timeout, :disconnect].include?(e) }
        if Server.configuration.channel_deprecated?(channel_key)
          Firehose.logger.warn "Subscription to DEPRECATED Channel: #{channel_key} from client: #{@client_info}"
        end
        on_subscribe(params)
      end

      def on_subscribe(params)
        message_filter.on_subscribe(params)
      end

      def on_unsubscribe
        message_filter.on_unsubscribe
      end

      def on_message(message)
        message_filter.process(message)
      end

      def next_messages
        list_key     = Server::Redis.key(channel_key, :list)
        sequence_key = Server::Redis.key(channel_key, :sequence)

        @redis.multi
          @redis.get(sequence_key).
            errback {|e| @deferrable.fail e }
          # Fetch entire list: http://stackoverflow.com/questions/10703019/redis-fetch-all-value-of-list-without-iteration-and-without-popping
          @redis.lrange(list_key, 0, -1).
            errback {|e| @deferrable.fail e }
        @redis.exec.callback do |(channel_sequence, message_list)|
          # Reverse the messages so they can be correctly procesed by the MessageBuffer class. There's
          # a patch in the message-buffer-redis branch that moves this concern into the Publisher LUA
          # script. We kept it out of this for now because it represents a deployment risk and `reverse!`
          # is a cheap operation in Ruby.
          message_list.reverse!
          channel_sequence = [channel_sequence.to_i, message_list.size].max
          buffer = MessageBuffer.new(message_list, channel_sequence, @sequence)
          if buffer.remaining_messages.empty?
            Firehose.logger.debug "No messages in buffer, subscribing. sequence: `#{channel_sequence}` consumer_sequence: #{@sequence}"
            # Either this resource has never been seen before or we are all caught up.
            # Subscribe and hope something gets published to this end-point.
            subscribe
          else # Either the client is under water or caught up to head.
            @deferrable.succeed process_messages(buffer.remaining_messages)
            @deferrable.callback { on_unsubscribe }
          end
        end.errback {|e| @deferrable.fail e }
        # TODO -ARRRG!!!! We can't listen to deferrables here. Need to expose those
        # on this class because this is a class var. In practice thats how we roll anyway.
        @deferrable
      end

      def unsubscribe
        @subscriber.unsubscribe self
      end

      def process_messages(messages)
        messages = messages.map do |m|
          m = m.dup
          on_message(m)
          m
        end
        @deferrable.succeed messages
      end

      private

      def message_filter
        @message_filter ||= begin
          if mf = Server.configuration.message_filter
            mf.new(self)
          else
            Firehose::Server::MessageFilter.new(self)
          end
        end
      end

      def subscribe
        @subscriber.subscribe self
        if @timeout
          timer = EventMachine::Timer.new(@timeout) do
            @deferrable.fail :timeout
            unsubscribe
          end
          # Cancel the timer if when the deferrable succeeds
          @deferrable.callback { timer.cancel }
        end
      end
    end
  end
end