iravench/ruby_cqrs

View on GitHub
lib/ruby_cqrs/domain/aggregate_repository.rb

Summary

Maintainability
A
25 mins
Test Coverage
require 'active_support/inflector'
require_relative '../guid'
require 'contracts'
require_relative '../contracts'

module RubyCqrs
  class AggregateNotFoundError < StandardError; end
  class AggregateConcurrencyError < StandardError; end
  class AggregateDuplicationError < StandardError; end

  module Domain
    class AggregateRepository
      include Contracts
      include Contracts::Modules
      include RubyCqrs::Data::Decodable

      Contract Validation::EventStore, Any => Any
      def initialize event_store, command_context
        raise ArgumentError unless event_store.is_a? Data::EventStore
        @event_store = event_store
        @command_context = command_context
      end

      Contract Validation::AggregateId => Validation::Aggregate
      def find_by aggregate_id
        state = @event_store.load_by(aggregate_id, @command_context)
        raise AggregateNotFoundError if (state.nil? or state[:aggregate_type].nil? or\
                                    ((state[:events].nil? or state[:events].empty?) and state[:snapshot].nil?))

        create_instance_from state
      end

      Contract Validation::Aggregate => nil
      def save one_aggregate
        delegate_persistence_of [ one_aggregate ]
      end

      Contract ArrayOf[Validation::Aggregate] => nil
      def save many_aggregate
        delegate_persistence_of many_aggregate
      end

    private
      Contract Or[Validation::SerializedAggregateState,\
                  Validation::SerializedAggregateStateWithSnapshot] => Validation::Aggregate
      def create_instance_from state
        try_decode_serialized_from state
        instance = state[:aggregate_type].constantize.new
        instance.send(:load_from, state)
        instance
      end

      Contract ArrayOf[Validation::Aggregate] => nil
      def delegate_persistence_of aggregates
        verify_uniqueness_of aggregates

        changes = prep_changes_for(aggregates)
        if changes.size > 0
          @event_store.save changes, @command_context
          aggregates.each do |aggregate|
            aggregate.send(:commit)
          end
        end

        nil
      end

      Contract ArrayOf[Validation::Aggregate] => nil
      def verify_uniqueness_of aggregates
        uniq_array =  aggregates.uniq { |aggregate| aggregate.aggregate_id }
        raise AggregateDuplicationError unless uniq_array.size == aggregates.size
        nil
      end

      Contract ArrayOf[Validation::Aggregate] =>\
        Or[ ArrayOf[Validation::SerializedAggregateState],\
            ArrayOf[Validation::SerializedAggregateStateWithSnapshot]]
      def prep_changes_for aggregates
        to_return = []
        aggregates.inject(to_return) do |product, aggregate|
          raise ArgumentError unless aggregate.is_a? Aggregate
          aggregate_change = aggregate.send(:get_changes)
          next if aggregate_change.nil?
          try_encode_serializable_in aggregate_change
          product << aggregate_change
        end
        to_return
      end

      Contract Or[Validation::SerializedAggregateState,\
                  Validation::SerializedAggregateStateWithSnapshot] => nil
      def try_decode_serialized_from state
        state[:snapshot] = decode_snapshot_state_from state[:snapshot]\
          if state.has_key? :snapshot

        state[:events] = state[:events].map { |event_record| decode_event_from event_record }\
          if state[:events].size > 0

        nil
      end

      def decode_snapshot_state_from snapshot_record
        snapshot_state = try_decode snapshot_record[:state_type], snapshot_record[:data]
        { :state => snapshot_state, :version => snapshot_record[:version] }
      end

      def decode_event_from event_record
        decoded_event = try_decode event_record[:event_type], event_record[:data]
        decoded_event.instance_variable_set(:@aggregate_id, event_record[:aggregate_id])
        decoded_event.instance_variable_set(:@version, event_record[:version])
        decoded_event
      end

      Contract Or[Validation::AggregateChanges,\
                  Validation::AggregateChangesWithSnapshot] => nil
      def try_encode_serializable_in change
        if change.has_key? :snapshot
          encoded_snapshot = encode_data_from change[:snapshot][:state]
          change[:snapshot] = { :state_type => change[:snapshot][:state_type],
                                :version => change[:snapshot][:version],
                                :data => encoded_snapshot }
        end

        if change[:events].size > 0
          change[:events].map! { |event|
            { :data => encode_data_from(event),
              :aggregate_id => event.aggregate_id,
              :event_type => event.class.name,
              :version => event.version }
          }
        end

        nil
      end

      def encode_data_from obj
        data = obj
        data = data.try_encode if data.is_a? RubyCqrs::Data::Encodable
        data
      end
    end
  end
end