arkency/rails_event_store

View on GitHub
ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true

require "active_support/core_ext/array"

module RubyEventStore
  module ActiveRecord
    class EventRepository
      POSITION_SHIFT = 1

      def initialize(model_factory: WithDefaultModels.new, serializer:)
        @serializer = serializer

        @event_klass, @stream_klass = model_factory.call
        @repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer)
        @index_violation_detector = IndexViolationDetector.new(@event_klass.table_name, @stream_klass.table_name)
      end

      def append_to_stream(records, stream, expected_version)
        return if records.empty?

        hashes = []
        event_ids = []
        records.each do |record|
          hashes << insert_hash(record, record.serialize(serializer))
          event_ids << record.event_id
        end
        add_to_stream(event_ids, stream, expected_version) { @event_klass.insert_all!(hashes) }
      end

      def link_to_stream(event_ids, stream, expected_version)
        return if event_ids.empty?

        (event_ids - @event_klass.where(event_id: event_ids).pluck(:event_id)).each do |id|
          raise EventNotFound.new(id)
        end
        add_to_stream(event_ids, stream, expected_version)
      end

      def delete_stream(stream)
        @stream_klass.where(stream: stream.name).delete_all
      end

      def has_event?(event_id)
        @repo_reader.has_event?(event_id)
      end

      def last_stream_event(stream)
        @repo_reader.last_stream_event(stream)
      end

      def read(specification)
        @repo_reader.read(specification)
      end

      def count(specification)
        @repo_reader.count(specification)
      end

      def search_streams(stream_name)
        @repo_reader.search_streams(stream_name)
      end

      def update_messages(records)
        hashes = records.map { |record| upsert_hash(record, record.serialize(serializer)) }
        for_update = records.map(&:event_id)
        start_transaction do
          existing =
            @event_klass
              .where(event_id: for_update)
              .pluck(:event_id, :id, :created_at)
              .reduce({}) { |acc, (event_id, id, created_at)| acc.merge(event_id => [id, created_at]) }
          (for_update - existing.keys).each { |id| raise EventNotFound.new(id) }
          hashes.each do |h|
            h[:id] = existing.fetch(h.fetch(:event_id)).at(0)
            h[:created_at] = existing.fetch(h.fetch(:event_id)).at(1)
          end
          @event_klass.upsert_all(hashes)
        end
      end

      def streams_of(event_id)
        @repo_reader.streams_of(event_id)
      end

      def position_in_stream(event_id, stream)
        @repo_reader.position_in_stream(event_id, stream)
      end

      def global_position(event_id)
        @repo_reader.global_position(event_id)
      end

      def event_in_stream?(event_id, stream)
        @repo_reader.event_in_stream?(event_id, stream)
      end

      def inspect
        "#{self.class} with #{::ActiveRecord::Base.connection.adapter_name} db adapter"
      end

      private

      attr_reader :serializer

      def add_to_stream(event_ids, stream, expected_version)
        last_stream_version = ->(stream_) do
          @stream_klass.where(stream: stream_.name).order("position DESC").first.try(:position)
        end
        resolved_version = expected_version.resolve_for(stream, last_stream_version)

        start_transaction do
          yield if block_given?
          in_stream =
            event_ids.map.with_index do |event_id, index|
              {
                stream: stream.name,
                position: compute_position(resolved_version, index),
                event_id: event_id,
                created_at: Time.now.utc
              }
            end
          @stream_klass.insert_all!(in_stream) unless stream.global?
        end
        self
      rescue ::ActiveRecord::RecordNotUnique => e
        raise_error(e)
      end

      def raise_error(e)
        raise EventDuplicatedInStream if detect_index_violated(e.message)
        raise WrongExpectedEventVersion
      end

      def compute_position(resolved_version, index)
        resolved_version + index + POSITION_SHIFT unless resolved_version.nil?
      end

      def detect_index_violated(message)
        @index_violation_detector.detect(message)
      end

      def insert_hash(record, serialized_record)
        {
          event_id: serialized_record.event_id,
          data: serialized_record.data,
          metadata: serialized_record.metadata,
          event_type: serialized_record.event_type,
          created_at: record.timestamp,
          valid_at: optimize_timestamp(record.valid_at, record.timestamp)
        }
      end

      def upsert_hash(record, serialized_record)
        {
          event_id: serialized_record.event_id,
          data: serialized_record.data,
          metadata: serialized_record.metadata,
          event_type: serialized_record.event_type,
          valid_at: optimize_timestamp(record.valid_at, record.timestamp)
        }
      end

      def optimize_timestamp(valid_at, created_at)
        valid_at unless valid_at.eql?(created_at)
      end

      def start_transaction(&block)
        @event_klass.transaction(requires_new: true, &block)
      end
    end
  end
end