deepcerulean/metacosm

View on GitHub
lib/metacosm/remote_simulation.rb

Summary

Maintainability
A
1 hr
Test Coverage
module Metacosm
  class RemoteSimulation < Simulation
    def initialize(command_queue, event_stream)
      @command_queue_name = command_queue
      @event_stream_name  = event_stream
      setup_connection
    end

    def apply(command); fire command end

    def fire(command)
      puts "---> Firing command at remote sim..."
      puts "[command: #{command.inspect}]"
      command_dto = command.attrs.merge(handler_module: command.handler_module_name, handler_class_name: command.handler_class_name)

      Thread.new do
        REDIS_PUB.with do |redis|
          puts "---> Sending command over redis conn: #{redis.inspect}"
          redis.publish(@command_queue_name, Marshal.dump(command_dto))
        end
      end
      puts "---> Sent!"
      true
    end

    def received_events
      @events_received ||= []
    end

    def setup_connection
      @remote_listener_thread = Thread.new do
        begin
          REDIS_SUB.with do |redis|
            redis.subscribe(@event_stream_name) do |on|
              on.subscribe do |channel, subscriptions|
                puts "Subscribed to remote simulation event stream ##{channel} (#{subscriptions} subscriptions)"
              end

              on.message do |channel, message|
                event = Marshal.load(message)
                listener_module_name = event.delete(:listener_module)
                listener_class_name = event.delete(:listener_class_name)
                module_name = listener_module_name
                module_name = "Object" if module_name.empty?
                listener = (module_name.constantize).const_get(listener_class_name).new(self)
                listener.receive(event)

                received_events.push(event)
              end

              on.unsubscribe do |channel, subscriptions|
                puts "Unsubscribed from remote simulation event stream ##{channel} (#{subscriptions} subscriptions)"
              end
            end
          end
        rescue ::Redis::BaseConnectionError => error
          puts "#{error}, retrying in 1s"
          sleep 1
          retry
        end
      end
    end
  end
end