lib/concurrent-ruby/concurrent/executor/timer_set.rb
require 'concurrent/scheduled_task'
require 'concurrent/atomic/event'
require 'concurrent/collection/non_concurrent_priority_queue'
require 'concurrent/executor/executor_service'
require 'concurrent/executor/single_thread_executor'
require 'concurrent/errors'
require 'concurrent/options'
module Concurrent
# Executes a collection of tasks, each after a given delay. A master task
# monitors the set and schedules each task for execution at the appropriate
# time. Tasks are run on the global thread pool or on the supplied executor.
# Each task is represented as a `ScheduledTask`.
#
# @see Concurrent::ScheduledTask
#
# @!macro monotonic_clock_warning
class TimerSet < RubyExecutorService
# Create a new set of timed tasks.
#
# @!macro executor_options
#
# @param [Hash] opts the options used to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
def initialize(opts = {})
super(opts)
end
# Post a task to be execute run after a given delay (in seconds). If the
# delay is less than 1/100th of a second the task will be immediately post
# to the executor.
#
# @param [Float] delay the number of seconds to wait for before executing the task.
# @param [Array<Object>] args the arguments passed to the task on execution.
#
# @yield the task to be performed.
#
# @return [Concurrent::ScheduledTask, false] IVar representing the task if the post
# is successful; false after shutdown.
#
# @raise [ArgumentError] if the intended execution time is not in the future.
# @raise [ArgumentError] if no block is given.
def post(delay, *args, &task)
raise ArgumentError.new('no block given') unless block_given?
return false unless running?
opts = { executor: @task_executor,
args: args,
timer_set: self }
task = ScheduledTask.execute(delay, opts, &task) # may raise exception
task.unscheduled? ? false : task
end
# Begin an immediate shutdown. In-progress tasks will be allowed to
# complete but enqueued tasks will be dismissed and no new tasks
# will be accepted. Has no additional effect if the thread pool is
# not running.
def kill
shutdown
end
private :<<
private
# Initialize the object.
#
# @param [Hash] opts the options to create the object with.
# @!visibility private
def ns_initialize(opts)
@queue = Collection::NonConcurrentPriorityQueue.new(order: :min)
@task_executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
@timer_executor = SingleThreadExecutor.new
@condition = Event.new
@ruby_pid = $$ # detects if Ruby has forked
end
# Post the task to the internal queue.
#
# @note This is intended as a callback method from ScheduledTask
# only. It is not intended to be used directly. Post a task
# by using the `SchedulesTask#execute` method.
#
# @!visibility private
def post_task(task)
synchronize { ns_post_task(task) }
end
# @!visibility private
def ns_post_task(task)
return false unless ns_running?
ns_reset_if_forked
if (task.initial_delay) <= 0.01
task.executor.post { task.process_task }
else
@queue.push(task)
# only post the process method when the queue is empty
@timer_executor.post(&method(:process_tasks)) if @queue.length == 1
@condition.set
end
true
end
# Remove the given task from the queue.
#
# @note This is intended as a callback method from `ScheduledTask`
# only. It is not intended to be used directly. Cancel a task
# by using the `ScheduledTask#cancel` method.
#
# @!visibility private
def remove_task(task)
synchronize { @queue.delete(task) }
end
# `ExecutorService` callback called during shutdown.
#
# @!visibility private
def ns_shutdown_execution
ns_reset_if_forked
@queue.clear
@timer_executor.kill
stopped_event.set
end
def ns_reset_if_forked
if $$ != @ruby_pid
@queue.clear
@condition.reset
@ruby_pid = $$
end
end
# Run a loop and execute tasks in the scheduled order and at the approximate
# scheduled time. If no tasks remain the thread will exit gracefully so that
# garbage collection can occur. If there are no ready tasks it will sleep
# for up to 60 seconds waiting for the next scheduled task.
#
# @!visibility private
def process_tasks
loop do
task = synchronize { @condition.reset; @queue.peek }
break unless task
now = Concurrent.monotonic_time
diff = task.schedule_time - now
if diff <= 0
# We need to remove the task from the queue before passing
# it to the executor, to avoid race conditions where we pass
# the peek'ed task to the executor and then pop a different
# one that's been added in the meantime.
#
# Note that there's no race condition between the peek and
# this pop - this pop could retrieve a different task from
# the peek, but that task would be due to fire now anyway
# (because @queue is a priority queue, and this thread is
# the only reader, so whatever timer is at the head of the
# queue now must have the same pop time, or a closer one, as
# when we peeked).
task = synchronize { @queue.pop }
begin
task.executor.post { task.process_task }
rescue RejectedExecutionError
# ignore and continue
end
else
@condition.wait([diff, 60].min)
end
end
end
end
end