firehoseio/firehose

View on GitHub
lib/firehose/cli.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'thor'
require 'eventmachine'
require 'uri'

# Enable native async-io libs
EM.kqueue if EM.kqueue?
EM.epoll  if EM.epoll?

module Firehose
  class CLI < Thor
    def initialize(*args)
      super
      # Disable buffering to $stdio for Firehose.logger
      $stdout.sync = true
    end

    desc "javascript", "Compile the Firehose JavaScript."
    def javascript
      $stderr.puts "DEPRECATION WARNING: Firehose JS assets have been moved to https://github.com/firehoseio/js_client"
      $stdout.puts Firehose::Assets::Sprockets.javascript
    end

    desc "version", "Display the current version."
    def version
      puts %[Firehose #{Firehose::VERSION} "#{Firehose::CODENAME}"]
    end

    desc "server", "Start an instance of a server."
    method_option :port,   :type => :numeric, :default => (ENV['PORT'] || Firehose::URI.port).to_i, :required => false, :aliases => '-p'
    method_option :host,   :type => :string,  :default => ENV['HOST'] || Firehose::URI.host, :required => false, :aliases => '-h'
    method_option :server, :type => :string,  :default => ENV['SERVER'] ||'rainbows', :required => false, :aliases => '-s'
    def server
      begin
        Firehose::Server::App.new(options).start
      rescue => e
        Firehose.logger.error "#{e.message}: #{e.backtrace}"
        raise e
      end
    end

    desc "consume URI", "Consume messages from a resource."
    method_option :concurrency, :type => :numeric, :default => 1, :aliases => '-c'
    def consume(uri)
      EM.run do
        options[:concurrency].times { Firehose::Client::Consumer.parse(uri).request }
      end
    end

    desc "publish URI [PAYLOAD]", "Publish messages to a resource."
    method_option :ttl, :type => :numeric, :aliases => '-t'
    method_option :times, :type => :numeric, :aliases => '-n', :default => 1
    method_option :interval, :type => :numeric, :aliases => '-i'
    def publish(uri, payload=nil)
      payload     ||= $stdin.read
      client      = Firehose::Client::Producer::Http.new(uri)
      path        = ::URI.parse(uri).path
      times       = options[:times]
      ttl         = options[:ttl]

      EM.run do
        # TODO I think this can be cleaned up so the top-level if/else can be ditched.
        if interval = options[:interval]
          # Publish messages at a forced interval.
          EM.add_periodic_timer interval do
            client.publish(payload).to(path, :ttl => ttl)
            EM.stop if times && (times-=1).zero?
          end
        else
          # Publish messages as soon as the last message was published.
          worker = Proc.new do
            client.publish(payload).to(path, :ttl => ttl)
            times && (times-=1).zero? ? EM.stop : worker.call
          end
          worker.call
        end
      end
    end
  end
end