kaspernj/thread_queues

View on GitHub
lib/thread_queues/string_buffer.rb

Summary

Maintainability
B
6 hrs
Test Coverage
require "monitor"

class ThreadQueues::StringBuffer
  attr_reader :lineno, :pos

  def initialize(queue)
    @queue = queue
    @buffer = ""
    @mutex = Monitor.new
    @lineno = 0
    @pos = 0
  end

  def gets(sep = "\n", limit = nil)
    if limit == nil && !sep.is_a?(String)
      limit = sep
      sep = "\n"
    end

    @mutex.synchronize do
      loop do
        if match = @buffer.match(/\A([\s\S]+?)#{Regexp.escape(sep)}/)
          take = match[0]
          if limit && take.length > limit
            take = take.slice(0, limit)
          end

          @buffer.gsub!(/\A#{Regexp.escape(take)}/, "")
          @lineno += 1
          @pos += match[0].bytesize
          return take
        end

        begin
          store_more_in_buffer
        rescue EOFError => e
          return rest_of_buffer if @buffer.length > 0
          raise e
        end
      end
    end
  end

  def each_line(&blk)
    with_enumerator(blk) do |y|
      begin
        loop do
          y << gets
        end
      rescue EOFError
      end
    end
  end

  def each_char(&blk)
    with_enumerator(blk) do |y|
      each_chunk do |chunk|
        chunk.each_char do |char|
          y << char
        end
      end
    end
  end

  def each_byte(&blk)
    with_enumerator(blk) do |y|
      each_chunk do |chunk|
        chunk.each_byte do |byte|
          y << byte
        end
      end
    end
  end

  def read(length = nil, outbuf = nil)
    return read_all if length == nil

    content = nil

    @mutex.synchronize do
      loop do
        if @buffer.length >= length
          content = @buffer.slice!(0, length)
          @pos += content.bytesize
          break
        end

        begin
          store_more_in_buffer
        rescue EOFError => e
          if @buffer.length > 0
            content = rest_of_buffer
            break
          end

          # Return nil if length is set like a normal IO - otherwise an empty string.
          if length
            return nil
          else
            content = ""
          end
        end
      end
    end

    if outbuf
      outbuf.clear
      outbuf << content
    end

    return content
  end

  def empty?
    if @pos == 0 && !@closed
      begin
        store_more_in_buffer
      rescue EOFError
      end
    end

    @pos == 0 && @buffer.empty?
  end

private

  def read_all
    str = ""
    each_line do |line|
      str << line
    end

    return str
  end

  def rest_of_buffer
    buffer = @buffer
    @buffer = ""
    @pos += buffer.bytesize
    return buffer
  end

  def each_chunk
    begin
      loop do
        begin
          content = @queue.pop.to_s
          @pos += content.bytesize
          yield content
        rescue Exception => e
          raise EOFError, "No live threads left. Deadlock?" if e.message == "No live threads left. Deadlock?"
          raise e
        end
      end
    rescue EOFError
    end
  end

  def store_more_in_buffer
    @mutex.synchronize do
      begin
        @buffer << @queue.pop.to_s
      rescue Exception => e
        raise EOFError, "No live threads left. Deadlock?" if e.message == "No live threads left. Deadlock?"
        raise e
      end
    end
  end

  def with_enumerator(blk)
    enum = Enumerator.new do |y|
      yield(y)
    end

    if blk
      enum.each(&blk)
    else
      enum
    end
  end
end