arkency/rails_event_store

View on GitHub
ruby_event_store/lib/ruby_event_store/in_memory_repository.rb

Summary

Maintainability
B
6 hrs
Test Coverage
# frozen_string_literal: true

require "ostruct"
module RubyEventStore
  class InMemoryRepository
    class UnsupportedVersionAnyUsage < StandardError
      def initialize
        super <<~EOS
        Mixing expected version :any and specific position (or :auto) is unsupported.

        Read more about expected versions here:
        https://railseventstore.org/docs/v2/expected_version/
        EOS
      end
    end

    class EventInStream
      def initialize(event_id, position)
        @event_id = event_id
        @position = position
      end

      attr_reader :event_id, :position
    end

    def initialize(serializer: NULL, ensure_supported_any_usage: false)
      @serializer = serializer
      @streams = Hash.new { |h, k| h[k] = Array.new }
      @mutex = Mutex.new
      @storage = Hash.new
      @ensure_supported_any_usage = ensure_supported_any_usage
    end

    def append_to_stream(records, stream, expected_version)
      serialized_records = records.map { |record| record.serialize(serializer) }

      with_synchronize(expected_version, stream) do |resolved_version|
        ensure_supported_any_usage(resolved_version, stream)
        unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
          raise WrongExpectedEventVersion
        end

        serialized_records.each_with_index do |serialized_record, index|
          raise EventDuplicatedInStream if has_event?(serialized_record.event_id)
          storage[serialized_record.event_id] = serialized_record
          add_to_stream(stream, serialized_record, resolved_version, index)
        end
      end
      self
    end

    def link_to_stream(event_ids, stream, expected_version)
      serialized_records = event_ids.map { |id| read_event(id) }

      with_synchronize(expected_version, stream) do |resolved_version|
        ensure_supported_any_usage(resolved_version, stream)
        unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
          raise WrongExpectedEventVersion
        end

        serialized_records.each_with_index do |serialized_record, index|
          raise EventDuplicatedInStream if has_event_in_stream?(serialized_record.event_id, stream.name)
          add_to_stream(stream, serialized_record, resolved_version, index)
        end
      end
      self
    end

    def delete_stream(stream)
      streams.delete(stream.name)
    end

    def has_event?(event_id)
      storage.has_key?(event_id)
    end

    def last_stream_event(stream)
      last_id = event_ids_of_stream(stream).last
      storage.fetch(last_id).deserialize(serializer) if last_id
    end

    def read(spec)
      serialized_records = read_scope(spec)
      if spec.batched?
        batch_reader = ->(offset, limit) do
          serialized_records
            .drop(offset)
            .take(limit)
            .map { |serialized_record| serialized_record.deserialize(serializer) }
        end
        BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each
      elsif spec.first?
        serialized_records.first&.deserialize(serializer)
      elsif spec.last?
        serialized_records.last&.deserialize(serializer)
      else
        Enumerator.new do |y|
          serialized_records.each { |serialized_record| y << serialized_record.deserialize(serializer) }
        end
      end
    end

    def count(spec)
      read_scope(spec).count
    end

    def update_messages(records)
      records.each do |record|
        read_event(record.event_id)
        serialized_record =
          Record
            .new(
              event_id: record.event_id,
              event_type: record.event_type,
              data: record.data,
              metadata: record.metadata,
              timestamp: Time.iso8601(storage.fetch(record.event_id).timestamp),
              valid_at: record.valid_at
            )
            .serialize(serializer)
        storage[record.event_id] = serialized_record
      end
    end

    def streams_of(event_id)
      streams.select { |name,| has_event_in_stream?(event_id, name) }.map { |name,| Stream.new(name) }
    end

    def search_streams(stream_name)
      streams
        .select { |name,| name.include?(stream_name) }
        .take(10)
        .map { |name,| Stream.new(name) }
    end

    def position_in_stream(event_id, stream)
      event_in_stream = streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }
      raise EventNotFoundInStream if event_in_stream.nil?
      event_in_stream.position
    end

    def global_position(event_id)
      storage.keys.index(event_id) or raise EventNotFound.new(event_id)
    end

    def event_in_stream?(event_id, stream)
      !streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }.nil?
    end

    private

    def read_scope(spec)
      serialized_records = serialized_records_of_stream(spec.stream)
      serialized_records = ordered(serialized_records, spec)
      serialized_records = serialized_records.select { |e| spec.with_ids.any? { |x| x.eql?(e.event_id) } } if spec
        .with_ids?
      serialized_records = serialized_records.select { |e| spec.with_types.any? { |x| x.eql?(e.event_type) } } if spec
        .with_types?
      serialized_records = serialized_records.reverse if spec.backward?
      serialized_records = serialized_records.drop(index_of(serialized_records, spec.start) + 1) if spec.start
      serialized_records = serialized_records.take(index_of(serialized_records, spec.stop)) if spec.stop
      serialized_records = serialized_records.take(spec.limit) if spec.limit?
      serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) < spec.older_than } if spec
        .older_than
      serialized_records =
        serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) <= spec.older_than_or_equal } if spec
        .older_than_or_equal
      serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) > spec.newer_than } if spec
        .newer_than
      serialized_records =
        serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) >= spec.newer_than_or_equal } if spec
        .newer_than_or_equal
      serialized_records
    end

    def time_comparison_field(spec, sr)
      if spec.time_sort_by_as_of?
        sr.valid_at
      else
        sr.timestamp
      end
    end

    def read_event(event_id)
      storage.fetch(event_id) { raise EventNotFound.new(event_id) }
    end

    def event_ids_of_stream(stream)
      streams.fetch(stream.name, Array.new).map(&:event_id)
    end

    def serialized_records_of_stream(stream)
      stream.global? ? storage.values : storage.fetch_values(*event_ids_of_stream(stream))
    end

    def ordered(serialized_records, spec)
      case spec.time_sort_by
      when :as_at
        serialized_records.sort_by(&:timestamp)
      when :as_of
        serialized_records.sort_by(&:valid_at)
      else
        serialized_records
      end
    end

    def last_stream_version(stream)
      streams.fetch(stream.name, Array.new).size - 1
    end

    def with_synchronize(expected_version, stream, &block)
      resolved_version = expected_version.resolve_for(stream, method(:last_stream_version))

      # expected_version :auto assumes external lock is used
      # which makes reading stream before writing safe.
      #
      # To emulate potential concurrency issues of :auto strategy without
      # such external lock we use Thread.pass to make race
      # conditions more likely. And we only use mutex.synchronize for writing
      # not for the whole read+write algorithm.
      Thread.pass
      mutex.synchronize { block.call(resolved_version) }
    end

    def has_event_in_stream?(event_id, stream_name)
      streams.fetch(stream_name, Array.new).any? { |event_in_stream| event_in_stream.event_id.eql?(event_id) }
    end

    def index_of(source, event_id)
      index = source.index { |item| item.event_id.eql?(event_id) }
      raise EventNotFound.new(event_id) unless index

      index
    end

    def compute_position(resolved_version, index)
      resolved_version + index + 1 unless resolved_version.nil?
    end

    def add_to_stream(stream, serialized_record, resolved_version, index)
      streams[stream.name] << EventInStream.new(serialized_record.event_id, compute_position(resolved_version, index))
    end

    def ensure_supported_any_usage(resolved_version, stream)
      if @ensure_supported_any_usage
        stream_positions = streams.fetch(stream.name, Array.new).map(&:position)
        if resolved_version.nil?
          raise UnsupportedVersionAnyUsage if !stream_positions.compact.empty?
        else
          raise UnsupportedVersionAnyUsage if stream_positions.include?(nil)
        end
      end
    end

    attr_reader :streams, :mutex, :storage, :serializer
  end
end