lib/mock_em/mock_em.rb
require 'logger'
require 'mock_em/logger_with_prefix'
require 'timecop'
module MockEM
# Fake EM suitable for unit testing.
# Uses Timecop to accelerate time. Should run Timecop.return after spec, just to be safe.
class MockEM
# @param [Timecop] timecop
def initialize(logger, timecop)
@log = LoggerWithPrefix.new("MockEM", logger)
@timecop = timecop
@next_tick_procs = []
@scheduled_tasks = ScheduledTasks.new(@log)
@timer_objects = []
@shutdown_hooks = []
@is_stopped = false
@max_timer_count = 100000 #TODO: not honored
end
def run(&block)
@reactor_running = true
@is_stopped = false
@log.info "run called. executing run block."
safely_run { block.call }
@log.info("Beginning tick loop.")
@tick_count = 0
while (!@is_stopped)
@tick_count += 1
due_tasks = @scheduled_tasks.pop_due_tasks(now_millis)
@log.info "Tick ##{@tick_count}, clock=#{now_millis}, due_tasks=#{due_tasks.count}, next_tick_procs=#{@next_tick_procs.count}"
this_tick_procs = due_tasks + @next_tick_procs
@next_tick_procs = []
if this_tick_procs.empty?
# accelerate time to next scheduled task
next_time = @scheduled_tasks.time_of_next_task
if next_time.nil?
@log.info "Nothing left to do! Returning."
break
else
delta = next_time - now_millis
@log.info "Nothing in this tick. Accelerating clock by #{delta / 1000.0}s to: #{next_time}"
set_clock(next_time)
end
end
this_tick_procs.each_with_index do |proc, index|
@log.info "Executing tick proc ##{index+1}"
safely_run { proc.call }
end
end
@log.info("Finished tick loop. Returning.")
ensure
@reactor_running = false
future_time = now_millis
@timecop.return
@log.debug "MockEM saved you #{(future_time - now_millis) / 1000} seconds."
end
def stop
@log.info "stop called"
@is_stopped = true
@next_tick_procs = []
@scheduled_tasks.clear_and_reset
hooks = @shutdown_hooks
@shutdown_hooks = []
if hooks.count > 0
@log.info "Executing #{hooks.count} shutdown hooks"
hooks.reverse.each(&:call)
end
end
def next_tick(proc = nil, &block)
proc ||= block
@log.info "Adding proc to next_tick"
@next_tick_procs << proc
end
def add_timer(delay_seconds, proc = nil, &block)
add_timer_internal(delay_seconds, nil, proc, &block)
end
def add_periodic_timer(period_seconds, proc = nil, &block)
proc ||= block
timer = MockTimer.new
@log.info "Creating periodic timer task: id=#{timer.id}, period_seconds=#{period_seconds}"
recursive_block = nil
recursive_block = lambda do
safely_run { proc.call }
if !timer.is_cancelled
@log.info "Rescheduling next run of periodic timer id=#{timer.id}"
add_timer_internal(period_seconds, timer, recursive_block)
end
end
add_timer_internal(period_seconds, timer, recursive_block)
end
def cancel_timer(timer)
#TODO: support looking up by timer ID as well
@timer_objects.delete(timer)
timer.cancel
end
def reactor_running?
!!(@reactor_running)
end
def get_max_timer_count
@max_timer_count
end
def add_shutdown_hook(&block)
@shutdown_hooks << block
end
def error_handler(proc = nil, &block)
proc ||= block
@log.info("Setting error_handler")
@error_handler = proc
end
# Simulates whatever EM.add_timer or EM.add_periodic_timer returns.
class MockTimer
@@id_seq = 0
attr_reader :id, :is_cancelled
def initialize
@is_cancelled = false
@id = @@id_seq += 1
end
def cancel
@is_cancelled = true
end
end
# Keeps track of tasks to execute in the future, each one consisting of a timestamp and proc to execute.
class ScheduledTasks
ScheduledTask = Struct.new(:timestamp, :proc)
def initialize(log)
@log = log
clear_and_reset
end
def add_task(timestamp_millis, &block)
@tasks << ScheduledTask.new(timestamp_millis, block)
@tasks = @tasks.sort_by(&:timestamp)
end
def pop_due_tasks(timestamp)
due_tasks = @tasks.take_while {|t| t.timestamp <= timestamp }
@tasks = @tasks - due_tasks
due_tasks.map(&:proc)
end
def time_of_next_task
task = @tasks.first
task && task.timestamp
end
def clear_and_reset
@tasks = []
end
end
private
def safely_run(&block)
begin
block.call
rescue => e
if @error_handler
@error_handler.call(e)
else
raise e
end
rescue Exception => e
@error_handler.call(e) if @error_handler
raise e
end
end
def set_clock(millis)
@timecop.travel(Time.at(millis / 1000.0))
end
def now_millis
(Time.now.utc.to_f * 1000.0).to_i
end
# same as add_timer, but adds an optional parameter: reuse_timer
def add_timer_internal(delay_seconds, reuse_timer, proc = nil, &block)
proc ||= block
timer = reuse_timer || MockTimer.new
@log.info "Adding timer task: id=#{timer.id}, delay_seconds=#{delay_seconds}"
@scheduled_tasks.add_task(now_millis + (delay_seconds * 1000)) do
if timer.is_cancelled
@log.debug "Skipping this timer task, it's already cancelled"
else
safely_run { proc.call }
end
end
@timer_objects << timer
timer
end
end
end