promiscuous-io/promiscuous

View on GitHub
lib/promiscuous/cli.rb

Summary

Maintainability
C
7 hrs
Test Coverage
require 'ruby-progressbar'

class Promiscuous::CLI
  attr_accessor :options

  def trap_debug_signals
    Signal.trap 'SIGUSR2' do
      # Using a thread because we cannot acquire mutexes in a trap context in
      # ruby 2.0
      Thread.new do
        Thread.list.each do |thread|
          next if Thread.current == thread

          print_status  '----[ Threads ]----' + '-' * (100-19)
          if thread.backtrace
            print_status "Thread #{thread} #{thread['label']}"
            print_status thread.backtrace.join("\n")
          else
            print_status "Thread #{thread} #{thread['label']} -- no backtrace"
          end
        end
      end
    end
  end

  def trap_exit_signals
    %w(SIGTERM SIGINT).each do |signal|
      Signal.trap(signal) do
        # Using a thread because we cannot acquire mutexes in a trap context in
        # ruby 2.0
        Thread.new do
          print_status "Exiting..."
          if @stop
            @worker.try(:show_stop_status)
          else
            @stop = true
            @worker.try(:stop)
            @worker = nil
          end
        end.join
      end
    end
  end

  def trap_signals
    trap_debug_signals
    trap_exit_signals
  end

  def publish
    options[:criterias].map { |criteria| eval(criteria) }.each do |criteria|
      break if @stop
      title = criteria.name
      title = "#{title}#{' ' * [0, 20 - title.size].max}"
      bar = ProgressBar.create(:format => '%t |%b>%i| %c/%C %e', :title => title, :total => criteria.count)
      criteria.each do |doc|
        break if @stop
        doc.promiscuous.sync(Promiscuous::Config.sync_all_routing)
        bar.increment
      end
    end
  end

  def record
    @worker = Promiscuous::Subscriber::Worker::Recorder.new(options[:log_file])
    @worker.start
    print_status "Recording..."
    sleep 0.2 until !@worker
  end

  def replay_payload(payload)
    endpoint = MultiJson.load(payload)['__amqp__']
    if endpoint
      # TODO confirm
      Promiscuous::AMQP.publish(:key => endpoint, :payload => payload)
      @num_msg += 1
    else
      puts "[warn] missing destination in #{payload}"
    end
  end

  def replay
    require 'json'
    @num_msg = 0
    File.open(options[:log_file], 'r').each do |line|
      break if @stop
      case line
      when /^\[promiscuous\] \[receive\] ({.*})$/ then replay_payload($1)
      when /^\[promiscuous\] \[publish\] .* -> ({.*})$/ then replay_payload($1)
      when /^({.*})$/ then replay_payload($1)
      end
    end

    print_status "Replayed #{@num_msg} messages"
  end

  def subscribe
    @worker = Promiscuous::Subscriber::Worker.new
    @worker.start
    Promiscuous::Config.subscriber_threads.tap do |threads|
      print_status "Replicating [#{threads} thread#{'s' if threads > 1}]..."
    end
    sleep 0.2 until !@worker
  end

  def publisher_recovery
    @worker = Promiscuous::Publisher::Worker.new
    @worker.start
    print_status "Waiting for messages to recover..."
    sleep 0.2 until !@worker
  end

  def generate_mocks
    f = options[:output] ? File.open(options[:output], 'w') : STDOUT
    f.write Promiscuous::Publisher::MockGenerator.generate
  end

  def parse_args(args)
    options = {}

    require 'optparse'
    parser = OptionParser.new do |opts|
      opts.banner = "Usage: promiscuous [options] action"

      opts.separator ""
      opts.separator "Actions:"
      opts.separator "    promiscuous publish \"Model1.where(:updated_at.gt => 1.day.ago)\" [Model2 Model3...]"
      opts.separator "    promiscuous publisher_recovery"
      opts.separator "    promiscuous subscribe"
      opts.separator "    promiscuous mocks"
      opts.separator "    promiscuous record logfile"
      opts.separator "    promiscuous replay logfile"
      opts.separator ""
      opts.separator "Options:"

      opts.on "-l", "--require FILE", "File to require to load your app. Don't worry about it with rails" do |file|
        options[:require] = file
      end

      opts.on "-p", "--prefetch [NUM]", "Number of messages to prefetch" do |prefetch|
        exit 1 if prefetch.to_i == 0
        Promiscuous::Config.prefetch = prefetch.to_i
      end

      opts.on "-o", "--output FILE", "Output file for mocks. Defaults to stdout" do |file|
        options[:output] = file
      end

      opts.on "-s", "--stat-interval [DURATION]", "Stats refresh rate (0 to disable)" do |duration|
        Promiscuous::Config.stats_interval = duration.to_f
      end

      opts.on "-t", "--threads [NUM]", "Number of subscriber worker threads to run. Defaults to 10." do |threads|
        Promiscuous::Config.subscriber_threads = threads.to_i
      end

      opts.on "-D", "--daemonize", "Daemonize process" do
        options[:daemonize] = true
      end

      opts.on "-P", "--pid-file [pid_file]", "Set a pid-file" do |pid_file|
        options[:pid_file] = pid_file
      end

      opts.on("-V", "--version", "Show version") do
        puts "Promiscuous #{Promiscuous::VERSION}"
        puts "License MIT"
        exit
      end
    end

    args = args.dup
    parser.parse!(args)

    options[:action] = args.shift.try(:to_sym)
    options[:criterias] = args
    options[:log_file] = args.first

    case options[:action]
    when :publish             then raise "Please specify one or more criterias" unless options[:criterias].present?
    when :subscribe           then raise "Why are you specifying a criteria?"   if     options[:criterias].present?
    when :record              then raise "Please specify a log file to record"  unless options[:log_file].present?
    when :replay              then raise "Please specify a log file to replay"  unless options[:log_file].present?
    when :publisher_recovery
    when :mocks
    else puts parser; exit 1
    end

    options
  rescue SystemExit
    exit
  rescue Exception => e
    puts e
    exit
  end

  def load_app
    if options[:require]
      begin
        require options[:require]
      rescue LoadError
        require "./#{options[:require]}"
      end
    else
      require 'rails'
      require 'promiscuous/railtie'
      require File.expand_path("./config/environment")
      ::Rails.application.eager_load!
    end
  end

  def boot
    self.options = parse_args(ARGV)
    daemonize if options[:daemonize]
    write_pid if options[:pid_file]
    load_app
    run
  end

  def daemonize
    Process.daemon(true)
  end

  def write_pid
    File.open(options[:pid_file], 'w') do |f|
      f.puts Process.pid
    end
  end

  def run
    trap_signals
    case options[:action]
    when :publish   then publish
    when :subscribe then subscribe
    when :record    then record
    when :replay    then replay
    when :mocks     then generate_mocks
    when :publisher_recovery  then publisher_recovery
    end
  end

  def print_status(msg)
    Promiscuous.info msg
    STDERR.puts msg
  end
end