celluloid/celluloid

View on GitHub
lib/celluloid/notifications.rb

Summary

Maintainability
A
0 mins
Test Coverage
module Celluloid
  module Notifications
    def self.notifier
      Actor[:notifications_fanout] || raise(DeadActorError, "notifications fanout actor not running")
    end

    def publish(pattern, *args)
      Celluloid::Notifications.notifier.publish(pattern, *args)
    rescue DeadActorError
      # Bad shutdown logic. Oh well....
      # TODO: needs a tests
    end

    module_function :publish

    def subscribe(pattern, method)
      Celluloid::Notifications.notifier.subscribe(Actor.current, pattern, method)
    end

    def unsubscribe(*args)
      Celluloid::Notifications.notifier.unsubscribe(*args)
    end

    class Fanout
      include Celluloid
      trap_exit :prune

      def initialize
        @subscribers = []
        @listeners_for = {}
      end

      def subscribe(actor, pattern, method)
        subscriber = Subscriber.new(actor, pattern, method).tap do |s|
          @subscribers << s
        end
        link actor
        @listeners_for.clear
        subscriber
      end

      def unsubscribe(subscriber)
        @subscribers.reject! { |s| s.matches?(subscriber) }
        @listeners_for.clear
      end

      def publish(pattern, *args)
        listeners_for(pattern).each { |s| s.publish(pattern, *args) }
      end

      def listeners_for(pattern)
        @listeners_for[pattern] ||= @subscribers.select { |s| s.subscribed_to?(pattern) }
      end

      def listening?(pattern)
        listeners_for(pattern).any?
      end

      def prune(actor, _reason = nil)
        @subscribers.reject! { |s| s.actor == actor }
        @listeners_for.clear
      end
    end

    class Subscriber
      attr_accessor :actor, :pattern, :method

      def initialize(actor, pattern, method)
        @actor = actor
        @pattern = pattern
        @method = method
      end

      def publish(pattern, *args)
        actor.async method, pattern, *args
      rescue DeadActorError
        # TODO: needs a tests
        # Bad shutdown logic. Oh well....
      end

      def subscribed_to?(pattern)
        !pattern || @pattern === pattern.to_s || @pattern === pattern
      end

      def matches?(subscriber_or_pattern)
        self === subscriber_or_pattern ||
          @pattern && @pattern === subscriber_or_pattern
      end
    end
  end

  def self.publish(*args)
    Notifications.publish(*args)
  end
end