turboladen/pants

View on GitHub
lib/pants/core.rb

Summary

Maintainability
A
25 mins
Test Coverage
require 'uri'
require 'eventmachine'
require_relative 'error'
require_relative 'logger'

Dir[File.dirname(__FILE__) + "/readers/*.rb"].each { |f| require f }
Dir[File.dirname(__FILE__) + "/writers/*.rb"].each { |f| require f }
require_relative 'seam'


class Pants

  # A single Core object is necessary for Pants to run.  Root-level readers are
  # attached to the core, then writers are attached to those readers.  It's
  # main job, other than giving a home to readers, is to handle the order of
  # starting up and shutting of the readers and writers so that no data is lost.
  class Core
    include LogSwitch::Mixin

    # @return [Array] The list of readers that are reading data.
    attr_reader :readers

    def initialize(&block)
      setup_signals
      @readers = []

      @convenience_block = block
    end

    # One method of adding a Reader to the Core.  Use this method to make code
    # reader nicer when reading something that's expressed as a URI.
    #
    # @example
    #   core = Pants::Core.new
    #   core.read 'udp://10.2.3.4:9000'
    #
    # @param [String,URI] uri The URI to the object to read.  Can be a file:///,
    #   udp://, or a string with the path to a file.
    #
    # @return [Pants::Reader] The newly created reader.
    def read(uri)
      begin
        uri = uri.is_a?(URI) ? uri : URI(uri)
      rescue URI::InvalidURIError
        @readers << new_reader_from_uri(nil, callback)
      else
        @readers << new_reader_from_uri(uri, callback)
      end

      @convenience_block.call(@readers.last) if @convenience_block

      @readers.last
    end

    # One method of adding a Reader to the Core.  Use this method to add an
    # a) already instantiated Reader object, or b) a Reader from a class of
    # Reader objects.
    #
    # @example Add using class and init variables
    #   core = Pants::Core.new
    #   core.add_reader(Pants::Readers::UDPReader, '10.2.3.4', 9000)
    #
    # @example Add using an already instantiated Reader object
    #   core = Pants::Core.new
    #   reader = Pants::Readers::UDPReader.new('10.2.3.4', 9000, core.callback)
    #   core.add_reader(reader)
    #
    # Notice how using the last method requires you to pass in the core's
    # callback method--this is probably one reason for avoiding this method of
    # adding a reader, yet remains available for flexibility.
    #
    # @param [Class,Pants::Reader] obj Either the class of a Reader to create,
    #   or an already created Reader object.
    # @param [*] args Any arguments that need to be used for creating the
    #   Reader.
    def add_reader(obj, *args)
      if obj.is_a? Class
        @readers << obj.new(*args, callback)
      elsif obj.kind_of? Pants::Readers::BaseReader
        @readers << obj
      else
        raise Pants::Error, "Don't know how to add a reader of type #{obj}"
      end

      @convenience_block.call(@readers.last) if @convenience_block

      @readers.last
    end

    # Creates an EventMachine::Callback method that other Readers, Writers, and
    # others can use for letting the Core know when it can shutdown.  Those
    # Readers, Writers, etc. should handle calling this callback when they're
    # done doing what they need to do.
    #
    # @return [EventMachine::Callback]
    def callback
      EM.Callback do
        if @readers.none?(&:running?)
          EM.stop_event_loop
        end
      end
    end

    # Starts the EventMachine reactor, the reader and the writers.
    def run
      raise Pants::Error, "No readers added yet" if @readers.empty?

      starter = proc do
        puts "Pants v#{Pants::VERSION}"
        puts ">> Reading from #{@readers.size} readers"

        @readers.each_with_index do |reader, i|
          puts ">> reader#{i}:  Starting read on: #{reader.read_object}"
          puts ">> reader#{i}:  Writing to #{reader.writers.size} writers"

          reader.writers.each_with_index do |writer, j|
            puts ">> reader#{i}writer#{j}:  #{writer.write_object}"
          end
        end

        EM::Iterator.new(@readers).each do |reader, iter|
          reader.start
          iter.next
        end
      end

      if EM.reactor_running?
        log "Joining reactor..."
        starter.call
      else
        log "Starting reactor..."
        EM.run(&starter)
      end
    end

    # Tells the reader to signal to its writers that it's time to finish.
    def stop!
      puts "Stop called.  Closing readers and writers..."

      if @readers.none?(&:running?)
        puts "No readers are running; nothing to do."
      else
        puts "Stopping readers:"

        @readers.each do |reader|
          puts "\t#{reader}" if reader.running?
        end

        @readers.each(&:stop!)
      end
    end

    # Stop, then run.
    def restart
      stop!
      puts "Restarting..."
      run
    end

    private

    # Register signals:
    # * TERM & QUIT calls +stop+ to shutdown gracefully.
    # * INT calls <tt>stop!</tt> to force shutdown.
    # * HUP calls <tt>restart</tt> to ... surprise, restart!
    def setup_signals
      @trapped_count ||= 0

      stopper = proc do
        @trapped_count += 1
        stop!

        # Reset count after 5 seconds
        EM.add_timer(5) { @trapped_count = 0 }
      end

      trap('INT')  do
        stopper.call
        abort "Multiple INT signals trapped; aborting!" if @trapped_count > 1
      end

      trap('TERM') { stopper.call }

      unless !!RUBY_PLATFORM =~ /mswin|mingw/
        trap('QUIT') { stop! }
        trap('HUP')  { restart }
      end
    end

    # @param [URI] uri The URI the Reader is mapped to.
    #
    # @return [Pants::Reader] An object of the type that's defined by the URI
    #   scheme.
    #
    # @raise [Pants::Error] If no Reader is mapped to +scheme+.
    def new_reader_from_uri(uri, callback)
      reader_to_use = if uri.nil?
        Pants.readers.find { |reader| reader[:uri_scheme].nil? }
      else
        Pants.readers.find { |reader| reader[:uri_scheme] == uri.scheme }
      end

      unless reader_to_use
        raise ArgumentError, "No reader found with URI scheme: #{uri.scheme}"
      end

      args = if reader_to_use[:args]
        reader_to_use[:args].map { |arg| uri.send(arg) }
      else
        []
      end

      reader_to_use[:klass].new(*args, callback)
    end
  end
end