dry-rb/dry-events

View on GitHub
lib/dry/events/publisher.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

require 'concurrent/map'

require 'dry/core/class_attributes'

require 'dry/events/constants'
require 'dry/events/event'
require 'dry/events/bus'
require 'dry/events/filter'

module Dry
  module Events
    # Exception raised when the same publisher is registered more than once
    #
    # @api public
    PublisherAlreadyRegisteredError = Class.new(StandardError) do
      # @api private
      def initialize(id)
        super("publisher with id #{id.inspect} already registered as: #{Publisher.registry[id]}")
      end
    end

    # @api public
    InvalidSubscriberError = Class.new(StandardError) do
      # @api private
      def initialize(object_or_event_id)
        case object_or_event_id
        when String, Symbol
          super("you are trying to subscribe to an event: `#{object_or_event_id}` that has not been registered")
        else
          super('you try use subscriber object that will never be executed')
        end
      end
    end

    UnregisteredEventError = Class.new(StandardError) do
      def initialize(object_or_event_id)
        case object_or_event_id
        when String, Symbol
          super("You are trying to publish an unregistered event: `#{object_or_event_id}`")
        else
          super("You are trying to publish an unregistered event")
        end
      end
    end

    # Extension used for classes that can publish events
    #
    # @example
    #   class AppEvents
    #     include Dry::Events::Publisher[:app]
    #
    #     register_event('users.created')
    #   end
    #
    #   class CreateUser
    #     attr_reader :events
    #
    #     def initialize(events)
    #       @events = events
    #     end
    #
    #     def call(user)
    #       # do your thing
    #       events.publish('users.created', user: user, time: Time.now)
    #     end
    #   end
    #
    #   app_events = AppEvents.new
    #   create_user = CreateUser.new(app_events)
    #
    #   # this will publish "users.created" event with its payload
    #   create_user.call(name: "Jane")
    #
    # @api public
    class Publisher < Module
      include Dry::Equalizer(:id)

      # Internal publisher registry, which is used to identify them globally
      #
      # This allows us to have listener classes that can subscribe to events
      # without having access to instances of publishers yet.
      #
      # @api private
      def self.registry
        @__registry__ ||= Concurrent::Map.new
      end

      # @!attribute [r] :id
      #   @return [Symbol,String] the publisher identifier
      #   @api private
      attr_reader :id

      # Create a publisher extension with the provided identifier
      #
      # @param [Symbol,String] id The identifier
      #
      # @return [Publisher]
      #
      # @raise PublisherAlreadyRegisteredError
      #
      # @api public
      def self.[](id)
        raise PublisherAlreadyRegisteredError.new(id) if registry.key?(id)

        new(id)
      end

      # @api private
      def initialize(id)
        @id = id
      end

      # Hook for inclusions/extensions
      #
      # It registers the publisher class under global registry using the id
      #
      # @api private
      def included(klass)
        klass.extend(ClassMethods)
        klass.include(InstanceMethods)

        self.class.registry[id] = klass

        super
      end

      # Class interface for publisher classes
      #
      # @api public
      module ClassMethods
        # Register an event
        #
        # @param [String] event_id The event identifier
        # @param [Hash] payload Optional default payload
        #
        # @api public
        def register_event(event_id, payload = EMPTY_HASH)
          events[event_id] = Event.new(event_id, payload)
          self
        end

        # Subscribe to an event
        #
        # @param [Symbol,String] event_id The event identifier
        # @param [Hash] filter_hash An optional filter for conditional listeners
        #
        # @return [Class] publisher class
        #
        # @api public
        def subscribe(event_id, filter_hash = EMPTY_HASH, &block)
          listeners[event_id] << [block, Filter.new(filter_hash)]
          self
        end

        # Sets up event bus for publisher instances
        #
        # @return [Bus]
        #
        # @api private
        def new_bus
          Bus.new(events: events.dup, listeners: listeners.dup)
        end

        # Global registry with events
        #
        # @api private
        def events
          @__events__ ||= Concurrent::Map.new
        end

        # Global registry with listeners
        #
        # @api private
        def listeners
          @__listeners__ ||= LISTENERS_HASH.dup
        end
      end

      # Instance interface for publishers
      #
      # @api public
      module InstanceMethods
        # Register a new event type at instance level
        #
        # @param [Symbol,String] event_id The event identifier
        # @param [Hash] payload Optional default payload
        #
        # @return [self]
        #
        # @api public
        def register_event(event_id, payload = EMPTY_HASH)
          __bus__.events[event_id] = Event.new(event_id, payload)
          self
        end

        # Publish an event
        #
        # @param [String] event_id The event identifier
        # @param [Hash] payload An optional payload
        #
        # @api public
        def publish(event_id, payload = EMPTY_HASH)
          if __bus__.can_handle?(event_id)
            __bus__.publish(event_id, payload)
            self
          else
            raise UnregisteredEventError, event_id
          end
        end
        alias_method :trigger, :publish

        # Subscribe to events.
        #
        # If the filter parameter is provided, filters events by payload.
        #
        # @param [Symbol,String,Object] object_or_event_id The event identifier or a listener object
        # @param [Hash] filter_hash An optional event filter
        #
        # @return [Object] self
        #
        # @api public
        def subscribe(object_or_event_id, filter_hash = EMPTY_HASH, &block)
          if __bus__.can_handle?(object_or_event_id)
            filter = Filter.new(filter_hash)

            if block
              __bus__.subscribe(object_or_event_id, filter, &block)
            else
              __bus__.attach(object_or_event_id, filter)
            end

            self
          else
            raise InvalidSubscriberError, object_or_event_id
          end
        end

        # Unsubscribe a listener
        #
        # @param [Object] listener The listener object
        #
        # @return [self]
        #
        # @api public
        def unsubscribe(listener)
          __bus__.detach(listener)
        end

        # Return true if a given listener has been subscribed to any event
        #
        # @api public
        def subscribed?(listener)
          __bus__.subscribed?(listener)
        end

        # Utility method which yields event with each of its listeners
        #
        # Listeners are already filtered out when filter was provided during
        # subscription
        #
        # @param [Symbol,String] event_id The event identifier
        # param [Hash] payload An optional payload
        #
        # @api public
        def process(event_id, payload = EMPTY_HASH, &block)
          __bus__.process(event_id, payload, &block)
        end

        # Internal event bus
        #
        # @return [Bus]
        #
        # @api private
        def __bus__
          @__bus__ ||= self.class.new_bus
        end
      end
    end
  end
end