arkency/rails_event_store

View on GitHub
ruby_event_store/lib/ruby_event_store/client.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true

require "concurrent"

module RubyEventStore
  class Client
    def initialize(
      repository: InMemoryRepository.new,
      mapper: Mappers::Default.new,
      subscriptions: Subscriptions.new,
      dispatcher: Dispatcher.new,
      clock: default_clock,
      correlation_id_generator: default_correlation_id_generator,
      event_type_resolver: EventTypeResolver.new
    )
      @repository = repository
      @mapper = mapper
      @subscriptions = subscriptions
      @broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
      @clock = clock
      @metadata = Concurrent::ThreadLocalVar.new
      @correlation_id_generator = correlation_id_generator
      @event_type_resolver = event_type_resolver
    end

    # Persists events and notifies subscribed handlers about them
    #
    # @param events [Array<Event>, Event] event(s)
    # @param stream_name [String] name of the stream for persisting events.
    # @param expected_version [:any, :auto, :none, Integer] controls optimistic locking strategy. {http://railseventstore.org/docs/expected_version/ Read more}
    # @return [self]
    def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
      enriched_events = enrich_events_metadata(events)
      records = transform(enriched_events)
      append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
      enriched_events.zip(records) do |event, record|
        with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do
          broker.(event, record)
        end
      end
      self
    end

    # Persists new event(s) without notifying any subscribed handlers
    #
    # @param (see #publish)
    # @return [self]
    def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
      append_records_to_stream(
        transform(enrich_events_metadata(events)),
        stream_name: stream_name,
        expected_version: expected_version
      )
      self
    end

    # Links already persisted event(s) to a different stream.
    # Does not notify any subscribed handlers.
    #
    # @param event_ids [String, Array<String>] ids of events
    # @param stream_name (see #publish)
    # @param expected_version (see #publish)
    # @return [self]
    def link(event_ids, stream_name:, expected_version: :any)
      repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
      self
    end

    # Deletes a stream.
    # All events from the stream remain intact but they are no
    # longer linked to the stream.
    #
    # @param stream_name [String] name of the stream to be cleared.
    # @return [self]
    def delete_stream(stream_name)
      repository.delete_stream(Stream.new(stream_name))
      self
    end

    # Starts building a query specification for reading events.
    # {http://railseventstore.org/docs/read/ More info.}
    #
    # @return [Specification]
    def read
      Specification.new(SpecificationReader.new(repository, mapper))
    end

    # Gets list of streams where event is stored or linked
    #
    # @return [Array<Stream>] where event is stored or linked
    def streams_of(event_id)
      repository.streams_of(event_id)
    end

    def search_streams(stream)
      repository.search_streams(stream)
    end

    # Gets position of the event in given stream
    #
    # The position is always nonnegative.
    # Returns nil if the event has no specific position in stream.
    # Raise error if event is not present in stream.
    #
    # @param event_id [String]
    # @param stream_name [String]
    # @return [Integer] nonnegative integer position of event in stream
    # @raise [EventNotInStream]
    def position_in_stream(event_id, stream_name)
      repository.position_in_stream(event_id, Stream.new(stream_name))
    end

    # Gets position of the event in global stream
    #
    # The position is always nonnegative.
    # Global position may have gaps, meaning, there may be event at
    # position 40, but no event at position 39.
    #
    # @param event_id [String]
    # @raise [EventNotFound]
    # @return [Integer] nonnegno ative integer position of event in global stream
    def global_position(event_id)
      repository.global_position(event_id)
    end

    # Checks whether event is linked in given stream
    #
    # @param event_id [String]
    # @param stream_name [String]
    # @return [Boolean] true if event is linked to given stream, false otherwise
    def event_in_stream?(event_id, stream_name)
      stream = Stream.new(stream_name)
      stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream)
    end

    # Subscribes a handler (subscriber) that will be invoked for published events of provided type.
    #
    # @overload subscribe(subscriber, to:)
    #   @param to [Array<Class>] types of events to subscribe
    #   @param subscriber [Object, Class] handler
    #   @return [Proc] - unsubscribe proc. Call to unsubscribe.
    #   @raise [ArgumentError, SubscriberNotExist]
    # @overload subscribe(to:, &subscriber)
    #   @param to [Array<Class>] types of events to subscribe
    #   @param subscriber [Proc] handler
    #   @return [Proc] - unsubscribe proc. Call to unsubscribe.
    #   @raise [ArgumentError, SubscriberNotExist]
    def subscribe(subscriber = nil, to:, &proc)
      raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
      subscriber ||= proc
      broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) })
    end

    # Subscribes a handler (subscriber) that will be invoked for all published events
    #
    # @overload subscribe_to_all_events(subscriber)
    #   @param subscriber [Object, Class] handler
    #   @return [Proc] - unsubscribe proc. Call to unsubscribe.
    #   @raise [ArgumentError, SubscriberNotExist]
    # @overload subscribe_to_all_events(&subscriber)
    #   @param subscriber [Proc] handler
    #   @return [Proc] - unsubscribe proc. Call to unsubscribe.
    #   @raise [ArgumentError, SubscriberNotExist]
    def subscribe_to_all_events(subscriber = nil, &proc)
      raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
      broker.add_global_subscription(subscriber || proc)
    end

    # Get list of handlers subscribed to an event
    #
    # @param to [Class, String] type of events to get list of sybscribed handlers
    # @return [Array<Object, Class>]
    def subscribers_for(event_class)
      subscriptions.all_for(event_type_resolver.call(event_class))
    end

    # Builder object for collecting temporary handlers (subscribers)
    # which are active only during the invocation of the provided
    # block of code.
    class Within
      def initialize(block, broker, resolver)
        @block = block
        @broker = broker
        @global_subscribers = []
        @subscribers = Hash.new { [] }
        @resolver = resolver
      end

      # Subscribes temporary handlers that
      # will be called for all published events.
      # The subscription is active only during the invocation
      # of the block of code provided to {Client#within}.
      # {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more.}
      #
      # @param handlers [Object, Class] handlers passed as objects or classes
      # @param handler2 [Proc] handler passed as proc
      # @return [self]
      def subscribe_to_all_events(*handlers, &handler2)
        handlers << handler2 if handler2
        @global_subscribers += handlers
        self
      end

      # Subscribes temporary handlers that
      # will be called for published events of provided type.
      # The subscription is active only during the invocation
      # of the block of code provided to {Client#within}.
      # {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more.}
      #
      # @overload subscribe(handler, to:)
      #   @param handler [Object, Class] handler passed as objects or classes
      #   @param to [Array<Class>] types of events to subscribe
      #   @return [self]
      # @overload subscribe(to:, &handler)
      #   @param to [Array<Class>] types of events to subscribe
      #   @param handler [Proc] handler passed as proc
      #   @return [self]
      def subscribe(handler = nil, to:, &handler2)
        raise ArgumentError if handler && handler2
        @subscribers[handler || handler2] += Array(to).map { |event_klass| resolver.call(event_klass) }
        self
      end

      # Invokes the block of code provided to {Client#within}
      # and then unsubscribes temporary handlers.
      # {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more.}
      #
      # @return [Object] value returned by the invoked block of code
      def call
        unsubs = add_thread_global_subscribers
        unsubs += add_thread_subscribers
        @block.call
      ensure
        unsubs.each(&:call) if unsubs
      end

      private
      attr_reader :resolver

      def add_thread_subscribers
        @subscribers.map { |subscriber, types| @broker.add_thread_subscription(subscriber, types) }
      end

      def add_thread_global_subscribers
        @global_subscribers.map { |subscriber| @broker.add_thread_global_subscription(subscriber) }
      end
    end

    # Use for starting temporary subscriptions.
    # {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more}
    #
    # @param block [Proc] block of code during which the temporary subscriptions will be active
    # @return [Within] builder object which collects temporary subscriptions
    def within(&block)
      raise ArgumentError if block.nil?
      Within.new(block, broker, event_type_resolver)
    end

    # Set additional metadata for all events published within the provided block
    # {http://railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
    #
    # @param metadata [Hash] metadata to set for events
    # @param block [Proc] block of code during which the metadata will be added
    # @return [Object] last value returned by the provided block
    def with_metadata(metadata_for_block, &block)
      previous_metadata = metadata
      self.metadata = previous_metadata.merge(metadata_for_block)
      block.call if block_given?
    ensure
      self.metadata = previous_metadata
    end

    # Deserialize event which was serialized for async event handlers
    # {http://railseventstore.org/docs/subscribe/#async-handlers Read more}
    #
    # @return [Event] deserialized event
    def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
      extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }

      mapper.record_to_event(
        SerializedRecord
          .new(
            event_type: event_type,
            event_id: event_id,
            data: data,
            metadata: metadata,
            timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)],
            valid_at: valid_at || timestamp_
          )
          .deserialize(serializer)
      )
    end

    # Read additional metadata which will be added for published events
    # {http://railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
    #
    # @return [Hash]
    def metadata
      @metadata.value || EMPTY_HASH
    end

    # Overwrite existing event(s) with the same ID.
    #
    # Does not notify any subscribed handlers.
    # Does not enrich with additional current metadata.
    # Does not allow changing which streams these events are in.
    # {http://railseventstore.org/docs/migrating_messages Read more}
    #
    # @example Add data and metadata to existing events
    #
    #   events = event_store.read.limit(10).to_a
    #   events.each do |ev|
    #     ev.data[:tenant_id] = 1
    #     ev.metadata[:server_id] = "eu-west-2"
    #   end
    #   event_store.overwrite(events)
    #
    # @example Change event type
    #
    #   events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
    #     NewType.new(
    #       event_id: ev.event_id,
    #       data: ev.data,
    #       metadata: ev.metadata,
    #     )
    #   end
    #   event_store.overwrite(events)
    #
    # @param events [Array<Event>, Event] event(s) to serialize and overwrite again
    # @return [self]
    def overwrite(events_or_event)
      repository.update_messages(transform(Array(events_or_event)))
      self
    end

    def inspect
      "#<#{self.class}:0x#{__id__.to_s(16)}>"
    end

    def inspect_repository
      if repository.public_method(:inspect).owner.equal?(Kernel)
        repository.class.inspect
      else
        repository.inspect
      end
    end

    EMPTY_HASH = {}.freeze
    private_constant :EMPTY_HASH

    private

    def transform(events)
      events.map { |ev| mapper.event_to_record(ev) }
    end

    def enrich_events_metadata(events)
      events = Array(events)
      events.each { |event| enrich_event_metadata(event) }
      events
    end

    def enrich_event_metadata(event)
      metadata.each { |key, value| event.metadata[key] ||= value }
      event.metadata[:timestamp] ||= clock.call
      event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp)
      event.metadata[:correlation_id] ||= correlation_id_generator.call
    end

    def append_records_to_stream(records, stream_name:, expected_version:)
      repository.append_to_stream(records, Stream.new(stream_name), ExpectedVersion.new(expected_version))
    end

    protected

    def metadata=(value)
      @metadata.value = value
    end

    def default_clock
      -> { Time.now.utc.round(TIMESTAMP_PRECISION) }
    end

    def default_correlation_id_generator
      -> { SecureRandom.uuid }
    end

    attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator, :event_type_resolver
  end
end