sclinede/blood_contracts-instrumentation

View on GitHub
lib/blood_contracts/instrumentation/session_finalizer/fibers.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

require "fiber"

module BloodContracts
  module Instrumentation
    module SessionFinalizer
      # Threads over fibers implementation of Session finaliazer
      class Fibers
        # Error message when fibers pool is not enough to run all the
        # instruments
        STARVATION_MSG = "WARNING! BC::Instrumentation fiber starvation!"

        # Pool of Fibers to finalize instrumentation session
        #
        # @return [Array<Fiber>]
        #
        attr_reader :fibers

        # Initialize the fibers pool
        #
        # @param pool_size [Integer] number of fibers to use in a single run
        def initialize(pool_size)
          @fibers = pool_size.times.map { create_fiber_with_a_thread }
        end

        # Run the instruments against the session in a loop (each in a separate
        # fiber on separate thread)
        #
        # Pros:
        #   - Each instrument call don't affect the others
        #   - Each instrument run in parallel (up to GIL)
        #   - Runs in parallel to the matching process, so should have
        #     minimum impact on BC::Refined#match speed
        # Cons:
        #   - thread creation have costs
        #   - the pool size is limited, so if you use number of instruments
        #     more the pool size  you have to update Config#finalizer_pool_size
        #     properly
        #
        # @param instruments [Array<Instrument>] list of Instruments to run
        #   against the session
        # @param session [Session] object that hold information about matching
        #   process, argument for Instrument#call
        #
        # @return [Nothing]
        #
        def finalize!(instruments, session)
          instruments.each do |instrument|
            raise STARVATION_MSG unless (fiber = @fibers.shift)

            fiber.resume instrument, session if fiber.alive?
          end
        end

        # @private
        #
        # Create a fiber which holds a Thread to run next Instrument#call
        # against the session
        #
        # @return [Fiber]
        protected def create_fiber_with_a_thread
          Fiber.new do |instrument, session|
            loop do
              thread = Thread.new(instrument, session) { |i, s| i.call(s) }
              @fibers.unshift Fiber.current
              instrument, session = Fiber.yield
              thread.join
            end
          end
        end
      end
    end
  end
end