jdantonio/concurrent-ruby

View on GitHub
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'concurrent/constants'
require 'concurrent/channel/buffer/base'
require 'concurrent/atomic/atomic_reference'

module Concurrent
  class Channel
    module Buffer

      # A blocking buffer with a size of zero. An item can only be put onto
      # the buffer when a thread is waiting to take. Similarly, an item can
      # only be put onto the buffer when a thread is waiting to put. When
      # either {#put} or {#take} is called and there is no corresponding call
      # in progress, the call will block indefinitely. Any other calls to the
      # same method will queue behind the first call and block as well. As
      # soon as a corresponding put/take call is made an exchange will occur
      # and the first blocked call will return.
      class Unbuffered < Base

        # @!macro channel_buffer_size_reader
        def size
          synchronize do
            putting.empty? ? 0 : 1
          end
        end

        # @!macro channel_buffer_empty_question
        def empty?
          size == 0
        end

        # @!macro channel_buffer_full_question
        def full?
          !empty?
        end

        # @!macro channel_buffer_put
        #
        # Items can only be put onto the buffer when one or more threads are
        # waiting to {#take} items off the buffer. When there is a thread
        # waiting to take an item this method will give its item and return
        # immediately. When there are no threads waiting to take, this method
        # will block. As soon as a thread calls `take` the exchange will
        # occur and this method will return.
        def put(item)
          mine = synchronize do
            return false if ns_closed?

            ref = Concurrent::AtomicReference.new(item)
            if taking.empty?
              putting.push(ref)
            else
              taken = taking.shift
              taken.value = item
              ref.value = nil
            end
            ref
          end
          loop do
            return true if mine.value.nil?
            Thread.pass
          end
        end

        # @!macro channel_buffer_offer
        #
        # Items can only be put onto the buffer when one or more threads are
        # waiting to {#take} items off the buffer. When there is a thread
        # waiting to take an item this method will give its item and return
        # `true` immediately. When there are no threads waiting to take or the
        # buffer is closed, this method will return `false` immediately.
        def offer(item)
          synchronize do
            return false if ns_closed? || taking.empty?

            taken = taking.shift
            taken.value = item
            true
          end
        end

        # @!macro channel_buffer_take
        #
        # Items can only be taken from the buffer when one or more threads are
        # waiting to {#put} items onto the buffer. When there is a thread
        # waiting to put an item this method will take that item and return it
        # immediately. When there are no threads waiting to put, this method
        # will block. As soon as a thread calls `pur` the exchange will occur
        # and this method will return.
        def take
          mine = synchronize do
            return Concurrent::NULL if ns_closed? && putting.empty?

            ref = Concurrent::AtomicReference.new(nil)
            if putting.empty?
              taking.push(ref)
            else
              put = putting.shift
              ref.value = put.value
              put.value = nil
            end
            ref
          end
          loop do
            item = mine.value
            return item if item
            Thread.pass
          end
        end

        # @!macro channel_buffer_poll
        #
        # Items can only be taken off the buffer when one or more threads are
        # waiting to {#put} items onto the buffer. When there is a thread
        # waiting to put an item this method will take the item and return
        # it immediately. When there are no threads waiting to put or the
        # buffer is closed, this method will return `Concurrent::NULL` immediately.
        def poll
          synchronize do
            return Concurrent::NULL if putting.empty?

            put = putting.shift
            value = put.value
            put.value = nil
            value
          end
        end

        # @!macro channel_buffer_next
        #
        # Items can only be taken from the buffer when one or more threads are
        # waiting to {#put} items onto the buffer. This method exhibits the
        # same blocking behavior as {#take}.
        #
        # @see #take
        def next
          item = take
          more = (item != Concurrent::NULL)
          return item, more
        end

        private

        def putting() @putting; end

        def taking() @taking; end

        # @!macro channel_buffer_initialize
        def ns_initialize
          # one will always be empty
          @putting = []
          @taking = []
          self.closed = false
          self.capacity = 1
        end
      end
    end
  end
end