arkency/rails_event_store

View on GitHub
ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository_reader.rb

Summary

Maintainability
D
1 day
Test Coverage
# frozen_string_literal: true

module RubyEventStore
  module ActiveRecord
    class EventRepositoryReader
      def initialize(event_klass, stream_klass, serializer)
        @event_klass = event_klass
        @stream_klass = stream_klass
        @serializer = serializer
      end

      def has_event?(event_id)
        @event_klass.exists?(event_id: event_id)
      end

      def last_stream_event(stream)
        record_ = @stream_klass.where(stream: stream.name).order("position DESC, id DESC").first
        record(record_) if record_
      end

      def read(spec)
        stream = read_scope(spec)
        if spec.batched?
          spec.time_sort_by ? offset_limit_batch_reader(spec, stream) : monotonic_id_batch_reader(spec, stream)
        elsif spec.first?
          record_ = stream.first
          record(record_) if record_
        elsif spec.last?
          record_ = stream.last
          record(record_) if record_
        else
          stream.map(&method(:record)).each
        end
      end

      def count(spec)
        read_scope(spec).count
      end

      def streams_of(event_id)
        @stream_klass.where(event_id: event_id).pluck(:stream).map { |name| Stream.new(name) }
      end

      def position_in_stream(event_id, stream)
        record = @stream_klass.select("position").where(stream: stream.name).find_by(event_id: event_id)
        raise EventNotFoundInStream if record.nil?
        record.position
      end

      def global_position(event_id)
        record = @event_klass.select("id").find_by(event_id: event_id)
        raise EventNotFound.new(event_id) if record.nil?
        record.id - 1
      end

      def event_in_stream?(event_id, stream)
        @stream_klass.where(event_id: event_id, stream: stream.name).exists?
      end

      def search_streams(stream)
        @stream_klass
          .where("stream LIKE ?", "#{stream}%")
          .limit(10)
          .pluck("distinct stream")
          .map { |name| Stream.new(name) }
      end

      private

      attr_reader :serializer

      def offset_limit_batch_reader(spec, stream)
        batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&method(:record)) }
        RubyEventStore::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each
      end

      def monotonic_id_batch_reader(spec, stream)
        batch_reader = ->(offset_id, limit) do
          search_in = spec.stream.global? ? @event_klass.table_name : @stream_klass.table_name
          records =
            if offset_id.nil?
              stream.limit(limit)
            else
              stream.where(start_offset_condition(spec, offset_id, search_in)).limit(limit)
            end
          [records.map(&method(:record)), records.last]
        end
        BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each
      end

      def read_scope(spec)
        if spec.stream.global?
          stream = @event_klass
          stream = stream.where(event_id: spec.with_ids) if spec.with_ids?
          stream = stream.where(event_type: spec.with_types) if spec.with_types?
          stream = stream.order(as_at(spec)) if spec.time_sort_by_as_at?
          stream = stream.order(as_of(spec)) if spec.time_sort_by_as_of?
          stream = stream.limit(spec.limit) if spec.limit?
          stream = stream.where(start_condition_in_global_stream(spec)) if spec.start
          stream = stream.where(stop_condition_in_global_stream(spec)) if spec.stop
          stream = stream.where(older_than_condition(spec)) if spec.older_than
          stream = stream.where(older_than_or_equal_condition(spec)) if spec.older_than_or_equal
          stream = stream.where(newer_than_condition(spec)) if spec.newer_than
          stream = stream.where(newer_than_or_equal_condition(spec)) if spec.newer_than_or_equal
          stream.order(id: order(spec))
        else
          stream = @stream_klass.includes(:event).where(stream: spec.stream.name)
          stream = stream.where(event_id: spec.with_ids) if spec.with_ids?
          stream = stream.joins(:event).where(@event_klass.table_name => { event_type: spec.with_types }) if spec.with_types?
          stream = stream.joins(:event).order(as_at(spec)) if spec.time_sort_by_as_at?
          stream = stream.joins(:event).order(as_of(spec)) if spec.time_sort_by_as_of?
          stream = stream.order(id: order(spec))
          stream = stream.limit(spec.limit) if spec.limit?
          stream = stream.where(start_condition(spec)) if spec.start
          stream = stream.where(stop_condition(spec)) if spec.stop
          stream = stream.joins(:event).where(older_than_condition(spec)) if spec.older_than
          stream = stream.joins(:event).where(older_than_or_equal_condition(spec)) if spec.older_than_or_equal
          stream = stream.joins(:event).where(newer_than_condition(spec)) if spec.newer_than
          stream = stream.joins(:event).where(newer_than_or_equal_condition(spec)) if spec.newer_than_or_equal
          stream
        end
      end

      def as_of(spec)
        expr = coalesce(@event_klass.arel_table[:valid_at], @event_klass.arel_table[:created_at])
        spec.forward? ? expr.asc : expr.desc
      end

      def as_at(spec)
        expr = @event_klass.arel_table[:created_at]
        spec.forward? ? expr.asc : expr.desc
      end

      def start_offset_condition(specification, record_id, search_in)
        condition = "#{search_in}.id #{specification.forward? ? ">" : "<"} ?"
        [condition, record_id]
      end

      def stop_offset_condition(specification, record_id, search_in)
        condition = "#{search_in}.id #{specification.forward? ? "<" : ">"} ?"
        [condition, record_id]
      end

      def start_condition(specification)
        start_offset_condition(
          specification,
          @stream_klass.find_by!(event_id: specification.start, stream: specification.stream.name),
          @stream_klass.table_name
        )
      rescue ::ActiveRecord::RecordNotFound
        raise EventNotFound.new(specification.start)
      end

      def stop_condition(specification)
        stop_offset_condition(
          specification,
          @stream_klass.find_by!(event_id: specification.stop, stream: specification.stream.name),
          @stream_klass.table_name
        )
      rescue ::ActiveRecord::RecordNotFound
        raise EventNotFound.new(specification.stop)
      end

      def start_condition_in_global_stream(specification)
        start_offset_condition(
          specification,
          @event_klass.find_by!(event_id: specification.start),
          @event_klass.table_name
        )
      rescue ::ActiveRecord::RecordNotFound
        raise EventNotFound.new(specification.start)
      end

      def stop_condition_in_global_stream(specification)
        stop_offset_condition(
          specification,
          @event_klass.find_by!(event_id: specification.stop),
          @event_klass.table_name
        )
      rescue ::ActiveRecord::RecordNotFound
        raise EventNotFound.new(specification.stop)
      end

      def coalesce(*exprs)
        Arel::Nodes::NamedFunction.new "COALESCE", exprs
      end

      def time_comparison_field(specification)
        if specification.time_sort_by_as_of?
          coalesce(@event_klass.arel_table[:valid_at], @event_klass.arel_table[:created_at])
        else
          @event_klass.arel_table[:created_at]
        end
      end

      def older_than_condition(specification)
        time_comparison_field(specification).lt(specification.older_than)
      end

      def older_than_or_equal_condition(specification)
        time_comparison_field(specification).lteq(specification.older_than_or_equal)
      end

      def newer_than_condition(specification)
        time_comparison_field(specification).gt(specification.newer_than)
      end

      def newer_than_or_equal_condition(specification)
        time_comparison_field(specification).gteq(specification.newer_than_or_equal)
      end

      def order(spec)
        spec.forward? ? "ASC" : "DESC"
      end

      def record(record)
        record = record.event if @stream_klass === record

        SerializedRecord
          .new(
            event_id: record.event_id,
            metadata: record.metadata,
            data: record.data,
            event_type: record.event_type,
            timestamp: record.created_at.iso8601(TIMESTAMP_PRECISION),
            valid_at: (record.valid_at || record.created_at).iso8601(TIMESTAMP_PRECISION)
          )
          .deserialize(serializer)
      end
    end

    private_constant(:EventRepositoryReader)
  end
end