firehoseio/firehose

View on GitHub
lib/firehose/server/subscriber.rb

Summary

Maintainability
A
0 mins
Test Coverage
A
96%
module Firehose
  module Server
    # Setups a connetion to Redis to listen for new resources...
    class Subscriber
      attr_reader :pubsub

      def initialize(redis = Firehose::Server.redis.connection)
        @pubsub = redis.pubsub
        # TODO: Instead of just raising an exception, it would probably be better
        #       for the errback to set some sort of 'disconnected' state. Then
        #       whenever a deferrable was 'subscribed' we could instantly fail
        #       the deferrable with whatever connection error we had.
        #       An alternative which would have a similar result would be to
        #       subscribe lazily (i.e. not until we have a deferrable to subscribe).
        #       Then, if connecting failed, it'd be super easy to fail the deferrable
        #       with the same error.
        #       The final goal is to allow the failed deferrable bubble back up
        #       so we can send back a nice, clean 500 error to the client.
        channel_updates_key = Server::Redis.key('channel_updates')
        pubsub.subscribe(channel_updates_key).
          errback{|e| EM.next_tick { raise e } }.
          callback { Firehose.logger.debug "Redis subscribed to `#{channel_updates_key}`" }
        pubsub.on(:message) do |_, payload|
          channel_key, channel_sequence, message = Server::Publisher.from_payload(payload)
          messages = [ Message.new(message, channel_sequence.to_i) ]
          if channels = subscriptions.delete(channel_key)
            Firehose.logger.debug "Redis notifying #{channels.count} channels(s) at `#{channel_key}` with channel_sequence `#{channel_sequence}` and message `#{message}`"
            channels.each do |channel|
              Firehose.logger.debug "Sending message #{message} and channel_sequence #{channel_sequence} to client from subscriber"
              channel.process_messages messages
            end
          end
        end
      end

      def subscribe(channel)
        subscriptions[channel.channel_key].push channel
      end

      def unsubscribe(channel)
        subscriptions[channel.channel_key].delete channel
        subscriptions.delete(channel.channel_key) if subscriptions[channel.channel_key].empty?
      end

      private
      def subscriptions
        @subscriptions ||= Hash.new{|h,k| h[k] = []}
      end
    end
  end
end