dddogdiamond/concurrent_worker

View on GitHub
lib/concurrent_worker/common.rb

Summary

Maintainability
A
0 mins
Test Coverage

module ConcurrentWorker
  require 'thread'

  class RequestCounter
    def initialize
      @count = Queue.new
      @com = Queue.new
    end
    def push(args)
      @count.push(args)
    end
    def pop
      Thread.handle_interrupt(Object => :never) do
        r = @count.pop
        @com.push(true)
        r
      end
    end
    
    def wait_until_less_than(n)
      return if @count.size < n
      while @com.pop
        break if @count.size < n
      end
    end

    def empty?
      @count.empty?
    end
    
    def size
      @count.size
    end

    def close
      @count.close
    end

    def closed?
      @count.closed?
    end

  end

  class IPCDuplexChannel
    def initialize
      @p_pid = Process.pid
      @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
      @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
    end

    def choose_io
      w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c]
      @wio, @rio = w_pipe[1], r_pipe[0]
      [w_pipe[0], r_pipe[1]].map(&:close)
    end
    
    def send(obj)
      begin
        Marshal.dump(obj, @wio)
      end
    end

    def recv
      begin
        Marshal.load(@rio)
      rescue IOError
        raise StopIteration
      end
    end
    
    def close
      [@wio, @rio].map(&:close)
    end
  end
end