celluloid/celluloid-io

View on GitHub
lib/celluloid/io/stream.rb

Summary

Maintainability
C
1 day
Test Coverage
# Partially adapted from Ruby's OpenSSL::Buffering
# Originally from the 'OpenSSL for Ruby 2' project
# Copyright (C) 2001 GOTOU YUUZOU <gotoyuzo@notwork.org>
# All rights reserved.
#
# This program is licenced under the same licence as Ruby.

module Celluloid
  module IO
    # Base class of all streams in Celluloid::IO
    class Stream < Socket
      include Enumerable

      # The "sync mode" of the stream
      #
      # See IO#sync for full details.
      attr_accessor :sync

      def initialize(socket)
        super
        @eof  = false
        @sync = true
        @read_buffer = ''.force_encoding(Encoding::ASCII_8BIT)
        @write_buffer = ''.force_encoding(Encoding::ASCII_8BIT)

        @read_latch  = Latch.new
        @write_latch = Latch.new
      end

      # Wait until the current object is readable
      def wait_readable; Celluloid::IO.wait_readable(self); end

      # Wait until the current object is writable
      def wait_writable; Celluloid::IO.wait_writable(self); end

      # System read via the nonblocking subsystem
      def sysread(length = nil, buffer = nil)
        buffer ||= ''.force_encoding(Encoding::ASCII_8BIT)

        @read_latch.synchronize do
          begin
            read_nonblock(length, buffer)
          rescue ::IO::WaitReadable
            wait_readable
            retry
          end
        end

        buffer
      end

      # System write via the nonblocking subsystem
      def syswrite(string)
        length = string.length
        total_written = 0

        remaining = string

        @write_latch.synchronize do
          while total_written < length
            begin
              written = write_nonblock(remaining)
            rescue ::IO::WaitWritable
              wait_writable
              retry
            rescue EOFError
              return total_written
            rescue Errno::EAGAIN
              wait_writable
              retry
            end

            total_written += written

            # FIXME: mutating the original buffer here. Seems bad.
            remaining.slice!(0, written) if written < remaining.length
          end
        end

        total_written
      end

      # Reads +size+ bytes from the stream.  If +buf+ is provided it must
      # reference a string which will receive the data.
      #
      # See IO#read for full details.
      def read(size=nil, buf=nil)
        if size == 0
          if buf
            buf.clear
            return buf
          else
            return ""
          end
        end

        until @eof
          break if size && size <= @read_buffer.size
          fill_rbuff
          break unless size
        end

        ret = consume_rbuff(size) || ""

        if buf
          buf.replace(ret)
          ret = buf
        end

        (size && ret.empty?) ? nil : ret
      end

      # Reads at most +maxlen+ bytes from the stream.  If +buf+ is provided it
      # must reference a string which will receive the data.
      #
      # See IO#readpartial for full details.
      def readpartial(maxlen, buf=nil)
        if maxlen == 0
          if buf
            buf.clear
            return buf
          else
            return ""
          end
        end

        if @read_buffer.empty?
          begin
            return sysread(maxlen, buf)
          rescue Errno::EAGAIN
            retry
          end
        end

        ret = consume_rbuff(maxlen)

        if buf
          buf.replace(ret)
          ret = buf
        end

        raise EOFError if ret.empty?
        ret
      end

      # Reads the next line from the stream.  Lines are separated by +eol+.  If
      # +limit+ is provided the result will not be longer than the given number of
      # bytes.
      #
      # +eol+ may be a String or Regexp.
      #
      # Unlike IO#gets the line read will not be assigned to +$_+.
      #
      # Unlike IO#gets the separator must be provided if a limit is provided.
      def gets(eol=$/, limit=nil)
        idx = @read_buffer.index(eol)

        until @eof
          break if idx
          fill_rbuff
          idx = @read_buffer.index(eol)
        end

        if eol.is_a?(Regexp)
          size = idx ? idx+$&.size : nil
        else
          size = idx ? idx+eol.size : nil
        end

        if limit and limit >= 0
          size = [size, limit].min
        end

        consume_rbuff(size)
      end

      # Executes the block for every line in the stream where lines are separated
      # by +eol+.
      #
      # See also #gets
      def each(eol=$/)
        while line = self.gets(eol)
          yield line
        end
      end
      alias each_line each

      # Reads lines from the stream which are separated by +eol+.
      #
      # See also #gets
      def readlines(eol=$/)
        ary = []

        while line = self.gets(eol)
          ary << line
        end

        ary
      end

      # Reads a line from the stream which is separated by +eol+.
      #
      # Raises EOFError if at end of file.
      def readline(eol=$/)
        raise EOFError if eof?
        gets(eol)
      end

      # Reads one character from the stream.  Returns nil if called at end of
      # file.
      def getc
        read(1)
      end

      # Calls the given block once for each byte in the stream.
      def each_byte # :yields: byte
        while c = getc
          yield(c.ord)
        end
      end

      # Reads a one-character string from the stream.  Raises an EOFError at end
      # of file.
      def readchar
        raise EOFError if eof?
        getc
      end

      # Pushes character +c+ back onto the stream such that a subsequent buffered
      # character read will return it.
      #
      # Unlike IO#getc multiple bytes may be pushed back onto the stream.
      #
      # Has no effect on unbuffered reads (such as #sysread).
      def ungetc(c)
        @read_buffer[0,0] = c.chr
      end

      # Returns true if the stream is at file which means there is no more data to
      # be read.
      def eof?
        fill_rbuff if !@eof && @read_buffer.empty?
        @eof && @read_buffer.empty?
      end
      alias eof eof?

      # Writes +s+ to the stream.  If the argument is not a string it will be
      # converted using String#to_s.  Returns the number of bytes written.
      def write(s)
        do_write(s)
        s.bytesize
      end

      # Writes +s+ to the stream.  +s+ will be converted to a String using
      # String#to_s.
      def << (s)
        do_write(s)
        self
      end

      # Writes +args+ to the stream along with a record separator.
      #
      # See IO#puts for full details.
      def puts(*args)
        s = ""
        if args.empty?
          s << "\n"
        end

        args.each do |arg|
          s << arg.to_s
          if $/ && /\n\z/ !~ s
            s << "\n"
          end
        end

        do_write(s)
        nil
      end

      # Writes +args+ to the stream.
      #
      # See IO#print for full details.
      def print(*args)
        s = ""
        args.each { |arg| s << arg.to_s }
        do_write(s)
        nil
      end

      # Formats and writes to the stream converting parameters under control of
      # the format string.
      #
      # See Kernel#sprintf for format string details.
      def printf(s, *args)
        do_write(s % args)
        nil
      end

      # Flushes buffered data to the stream.
      def flush
        osync = @sync
        @sync = true
        do_write ""
        return self
      ensure
        @sync = osync
      end

      # Closes the stream and flushes any unwritten data.
      def close
        flush rescue nil
        super
      end

      #######
      private
      #######

      # Fills the buffer from the underlying stream
      def fill_rbuff
        begin
          @read_buffer << sysread(BLOCK_SIZE)
        rescue Errno::EAGAIN
          retry
        rescue EOFError
          @eof = true
        end
      end

      # Consumes +size+ bytes from the buffer
      def consume_rbuff(size=nil)
        if @read_buffer.empty?
          nil
        else
          size = @read_buffer.size unless size
          ret = @read_buffer[0, size]
          @read_buffer[0, size] = ""
          ret
        end
      end

      # Writes +s+ to the buffer.  When the buffer is full or #sync is true the
      # buffer is flushed to the underlying stream.
      def do_write(s)
        @write_buffer << s
        @write_buffer.force_encoding(Encoding::BINARY)

        if @sync or @write_buffer.size > BLOCK_SIZE or idx = @write_buffer.rindex($/)
          remain = idx ? idx + $/.size : @write_buffer.length
          nwritten = 0

          while remain > 0
            str = @write_buffer[nwritten,remain]
            begin
              nwrote = syswrite(str)
            rescue Errno::EAGAIN
              retry
            end
            remain -= nwrote
            nwritten += nwrote
          end

          @write_buffer[0,nwritten] = ""
        end
      end

      # Perform an operation exclusively, uncontested by other tasks
      class Latch
        def initialize
          @owner = nil
          @waiters = 0
          @condition = Celluloid::Condition.new
        end

        # Synchronize an operation across all tasks in the current actor
        def synchronize
          actor = Thread.current[:celluloid_actor]
          return yield unless actor

          if @owner || @waiters > 0
            @waiters += 1
            @condition.wait
            @waiters -= 1
          end

          @owner = Task.current

          begin
            ret = yield
          ensure
            @owner = nil
            @condition.signal if @waiters > 0
          end

          ret
        end
      end
    end
  end
end