lib/metacosm/simulation.rb
module Metacosm
class Simulation
# TODO protected?
# def redis_connection
# Redis.new
# end
def params
@params ||= {}
end
def fire(command)
command_queue.push(command)
end
def command_queue
@command_queue ||= Queue.new
end
def event_queue
@event_queue ||= Queue.new
end
def conduct!
@conductor_thread = Thread.new { execute }
end
def execute
while true
if (command=command_queue.pop)
apply(command)
end
Thread.pass
end
end
def halt!
@conductor_thread.terminate
end
def mutex
@mutex = Mutex.new
end
def apply(command)
mutex.synchronize do
received_commands.push(command)
if command.is_a?(Hash)
handler_module_name = command.delete(:handler_module)
handler_class_name = command.delete(:handler_class_name)
module_name = handler_module_name
handler = (module_name.constantize).
const_get(handler_class_name).new
handler.handle(command)
else
handler = handler_for(command)
handler.handle(command.attrs)
end
end
end
def received_commands
@commands_received ||= []
end
def apply_event(event)
if !@on_event_callback.nil?
event_dto = event.attrs.merge(listener_module: event.listener_module_name, listener_class_name: event.listener_class_name)
@on_event_callback[event_dto]
end
if !@event_publication_channel.nil?
event_dto = event.attrs.merge(listener_module: event.listener_module_name, listener_class_name: event.listener_class_name)
REDIS_PUB.with do |redis|
redis.publish(@event_publication_channel, Marshal.dump(event_dto))
end
end
if !local_events_disabled?
listener = listener_for(event)
if event.attrs.any?
listener.receive(event.attrs)
else
listener.receive
end
end
end
def on_event(publish_to:nil,&blk)
unless publish_to.nil?
@event_publication_channel = publish_to
end
if block_given?
@on_event_callback = blk
end
end
def subscribe_for_commands(channel:)
p [ :subscribe_to_command_channel, channel: channel ]
@command_subscription_thread = Thread.new do
REDIS_SUB.with do |redis|
begin
redis.subscribe(channel) do |on|
on.subscribe do |chan, subscriptions|
puts "Subscribed to ##{chan} (#{subscriptions} subscriptions)"
end
on.message do |chan, message|
# puts "##{chan}: #{message}"
command_data = Marshal.load(message)
p [ :got_message, command_data: command_data ]
apply(command_data)
end
on.unsubscribe do |chan, subscriptions|
puts "Unsubscribed from ##{chan} (#{subscriptions} subscriptions)"
end
end
rescue Redis::BaseConnectionError => error
puts "#{error}, retrying in 1s"
sleep 1
retry
end
end
end
end
def disable_local_events
@local_events_disabled = true
end
def local_events_disabled?
@local_events_disabled ||= false
end
def receive(event, record: true)
events.push(event) if record
apply_event(event)
end
def events
@events ||= []
end
def self.current
@current ||= new
end
def clear!
@events = []
@command_queue && @command_queue.clear
end
protected
def handler_for(command)
@handlers ||= {}
@handlers[command.self_class_name] ||= construct_handler_for(command)
end
def construct_handler_for(command)
module_name = command.handler_module_name
(module_name.constantize).
const_get(command.handler_class_name).new
rescue => ex
binding.pry
raise ex
end
def listener_for(event)
@listeners ||= {}
@listeners[event.self_class_name] ||= construct_listener_for(event)
end
def construct_listener_for(event)
module_name = event.listener_module_name
listener = (module_name.constantize).const_get(event.listener_class_name).new(self)
listener
end
end
end