ruby-concurrency/concurrent-ruby

View on GitHub
lib/concurrent-ruby/concurrent/executor/timer_set.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'concurrent/scheduled_task'
require 'concurrent/atomic/event'
require 'concurrent/collection/non_concurrent_priority_queue'
require 'concurrent/executor/executor_service'
require 'concurrent/executor/single_thread_executor'
require 'concurrent/errors'
require 'concurrent/options'

module Concurrent

  # Executes a collection of tasks, each after a given delay. A master task
  # monitors the set and schedules each task for execution at the appropriate
  # time. Tasks are run on the global thread pool or on the supplied executor.
  # Each task is represented as a `ScheduledTask`.
  #
  # @see Concurrent::ScheduledTask
  #
  # @!macro monotonic_clock_warning
  class TimerSet < RubyExecutorService

    # Create a new set of timed tasks.
    #
    # @!macro executor_options
    #
    #   @param [Hash] opts the options used to specify the executor on which to perform actions
    #   @option opts [Executor] :executor when set use the given `Executor` instance.
    #     Three special values are also supported: `:task` returns the global task pool,
    #     `:operation` returns the global operation pool, and `:immediate` returns a new
    #     `ImmediateExecutor` object.
    def initialize(opts = {})
      super(opts)
    end

    # Post a task to be execute run after a given delay (in seconds). If the
    # delay is less than 1/100th of a second the task will be immediately post
    # to the executor.
    #
    # @param [Float] delay the number of seconds to wait for before executing the task.
    # @param [Array<Object>] args the arguments passed to the task on execution.
    #
    # @yield the task to be performed.
    #
    # @return [Concurrent::ScheduledTask, false] IVar representing the task if the post
    #   is successful; false after shutdown.
    #
    # @raise [ArgumentError] if the intended execution time is not in the future.
    # @raise [ArgumentError] if no block is given.
    def post(delay, *args, &task)
      raise ArgumentError.new('no block given') unless block_given?
      return false unless running?
      opts = { executor:  @task_executor,
               args:      args,
               timer_set: self }
      task = ScheduledTask.execute(delay, opts, &task) # may raise exception
      task.unscheduled? ? false : task
    end

    # Begin an immediate shutdown. In-progress tasks will be allowed to
    # complete but enqueued tasks will be dismissed and no new tasks
    # will be accepted. Has no additional effect if the thread pool is
    # not running.
    def kill
      shutdown
    end

    private :<<

    private

    # Initialize the object.
    #
    # @param [Hash] opts the options to create the object with.
    # @!visibility private
    def ns_initialize(opts)
      @queue              = Collection::NonConcurrentPriorityQueue.new(order: :min)
      @task_executor      = Options.executor_from_options(opts) || Concurrent.global_io_executor
      @timer_executor     = SingleThreadExecutor.new
      @condition          = Event.new
      @ruby_pid           = $$ # detects if Ruby has forked
    end

    # Post the task to the internal queue.
    #
    # @note This is intended as a callback method from ScheduledTask
    #   only. It is not intended to be used directly. Post a task
    #   by using the `SchedulesTask#execute` method.
    #
    # @!visibility private
    def post_task(task)
      synchronize { ns_post_task(task) }
    end

    # @!visibility private
    def ns_post_task(task)
      return false unless ns_running?
      ns_reset_if_forked
      if (task.initial_delay) <= 0.01
        task.executor.post { task.process_task }
      else
        @queue.push(task)
        # only post the process method when the queue is empty
        @timer_executor.post(&method(:process_tasks)) if @queue.length == 1
        @condition.set
      end
      true
    end

    # Remove the given task from the queue.
    #
    # @note This is intended as a callback method from `ScheduledTask`
    #   only. It is not intended to be used directly. Cancel a task
    #   by using the `ScheduledTask#cancel` method.
    #
    # @!visibility private
    def remove_task(task)
      synchronize { @queue.delete(task) }
    end

    # `ExecutorService` callback called during shutdown.
    #
    # @!visibility private
    def ns_shutdown_execution
      ns_reset_if_forked
      @queue.clear
      @timer_executor.kill
      stopped_event.set
    end

    def ns_reset_if_forked
      if $$ != @ruby_pid
        @queue.clear
        @condition.reset
        @ruby_pid = $$
      end
    end

    # Run a loop and execute tasks in the scheduled order and at the approximate
    # scheduled time. If no tasks remain the thread will exit gracefully so that
    # garbage collection can occur. If there are no ready tasks it will sleep
    # for up to 60 seconds waiting for the next scheduled task.
    #
    # @!visibility private
    def process_tasks
      loop do
        task = synchronize { @condition.reset; @queue.peek }
        break unless task

        now  = Concurrent.monotonic_time
        diff = task.schedule_time - now

        if diff <= 0
          # We need to remove the task from the queue before passing
          # it to the executor, to avoid race conditions where we pass
          # the peek'ed task to the executor and then pop a different
          # one that's been added in the meantime.
          #
          # Note that there's no race condition between the peek and
          # this pop - this pop could retrieve a different task from
          # the peek, but that task would be due to fire now anyway
          # (because @queue is a priority queue, and this thread is
          # the only reader, so whatever timer is at the head of the
          # queue now must have the same pop time, or a closer one, as
          # when we peeked).
          task = synchronize { @queue.pop }
          begin
            task.executor.post { task.process_task }
          rescue RejectedExecutionError
            # ignore and continue
          end
        else
          @condition.wait([diff, 60].min)
        end
      end
    end
  end
end