fluent/fluentd

View on GitHub
lib/fluent/command/cat.rb

Summary

Maintainability
B
6 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'optparse'
require 'fluent/env'
require 'fluent/time'
require 'fluent/msgpack_factory'
require 'fluent/version'

op = OptionParser.new

op.banner += " <tag>"
op.version = Fluent::VERSION

port = 24224
host = '127.0.0.1'
unix = false
socket_path = Fluent::DEFAULT_SOCKET_PATH

config_path = Fluent::DEFAULT_CONFIG_PATH
format = 'json'
message_key = 'message'
time_as_integer = false
retry_limit = 5
event_time = nil

op.on('-p', '--port PORT', "fluent tcp port (default: #{port})", Integer) {|i|
  port = i
}

op.on('-h', '--host HOST', "fluent host (default: #{host})") {|s|
  host = s
}

op.on('-u', '--unix', "use unix socket instead of tcp", TrueClass) {|b|
  unix = b
}

op.on('-s', '--socket PATH', "unix socket path (default: #{socket_path})") {|s|
  socket_path = s
}

op.on('-f', '--format FORMAT', "input format (default: #{format})") {|s|
  format = s
}

op.on('--json', "same as: -f json", TrueClass) {|b|
  format = 'json'
}

op.on('--msgpack', "same as: -f msgpack", TrueClass) {|b|
  format = 'msgpack'
}

op.on('--none', "same as: -f none", TrueClass) {|b|
  format = 'none'
}

op.on('--message-key KEY', "key field for none format (default: #{message_key})") {|s|
  message_key = s
}

op.on('--time-as-integer', "Send time as integer for v0.12 or earlier", TrueClass) { |b|
  time_as_integer = true
}

op.on('--retry-limit N', "Specify the number of retry limit (default: #{retry_limit})", Integer) {|n|
  retry_limit = n
}

op.on('--event-time TIME_STRING', "Specify the time expression string (default: nil)") {|v|
  event_time = v
}

singleton_class.module_eval do
  define_method(:usage) do |msg|
    puts op.to_s
    puts "error: #{msg}" if msg
    exit 1
  end
end

begin
  op.parse!(ARGV)

  if ARGV.length != 1
    usage nil
  end

  tag = ARGV.shift

rescue
  usage $!.to_s
end

require 'socket'
require 'yajl'
require 'msgpack'
require 'fluent/ext_monitor_require'

class Writer
  include MonitorMixin

  RetryLimitError = Class.new(StandardError)

  class TimerThread
    def initialize(writer)
      @writer = writer
    end

    def start
      @finish = false
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      @finish = true
      @thread.join
    end

    def run
      until @finish
        sleep 1
        @writer.on_timer
      end
    end
  end

  def initialize(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil)
    @tag = tag
    @connector = connector
    @socket = false

    @socket_time = Time.now.to_i
    @socket_ttl = 10  # TODO
    @error_history = []

    @pending = []
    @pending_limit = 1024  # TODO
    @retry_wait = 1
    @retry_limit = retry_limit
    @time_as_integer = time_as_integer
    @event_time = event_time

    super()
  end

  def secondary_record?(record)
    record.class != Hash &&
      record.size == 2 &&
      record.first.class == Fluent::EventTime &&
      record.last.class == Hash
  end

  def write(record)
    unless secondary_record?(record)
      if record.class != Hash
        raise ArgumentError, "Input must be a map (got #{record.class})"
      end
    end

    time = if @event_time
             Fluent::EventTime.parse(@event_time)
           else
             Fluent::EventTime.now
           end
    time = time.to_i if @time_as_integer
    entry = if secondary_record?(record)
              # Even though secondary contains Fluent::EventTime in record,
              # fluent-cat just ignore it and set Fluent::EventTime.now instead.
              # This specification is adopted to keep consistency.
              [time, record.last]
            else
              [time, record]
            end
    synchronize {
      unless write_impl([entry])
        # write failed
        @pending.push(entry)

        while @pending.size > @pending_limit
          # exceeds pending limit; trash oldest record
          time, record = @pending.shift
          abort_message(time, record)
        end
      end
    }
  end

  def on_timer
    now = Time.now.to_i

    synchronize {
      unless @pending.empty?
        # flush pending records
        if write_impl(@pending)
          # write succeeded
          @pending.clear
        end
      end

      if @socket && @socket_time + @socket_ttl < now
        # socket is not used @socket_ttl seconds
        close
      end
    }
  end

  def close
    @socket.close
    @socket = nil
  end

  def start
    @timer = TimerThread.new(self)
    @timer.start
    self
  end

  def shutdown
    @timer.shutdown
  end

  private
  def write_impl(array)
    socket = get_socket
    unless socket
      return false
    end

    begin
      packer = Fluent::MessagePackFactory.packer
      socket.write packer.pack([@tag, array])
      socket.flush
    rescue
      $stderr.puts "write failed: #{$!}"
      close
      return false
    end

    return true
  end

  def get_socket
    unless @socket
      unless try_connect
        return nil
      end
    end

    @socket_time = Time.now.to_i
    return @socket
  end

  def try_connect
    begin
      now = Time.now.to_i

      unless @error_history.empty?
        # wait before re-connecting
        wait = 1 #@retry_wait * (2 ** (@error_history.size-1))
        if now <= @socket_time + wait
          sleep(wait)
          try_connect
        end
      end

      @socket = @connector.call
      @error_history.clear
      return true

    rescue RetryLimitError => ex
      raise ex
    rescue
      $stderr.puts "connect failed: #{$!}"
      @error_history << $!
      @socket_time = now

      if @retry_limit < @error_history.size
        # abort all pending records
        @pending.each {|(time, record)|
          abort_message(time, record)
        }
        @pending.clear
        @error_history.clear
        raise RetryLimitError, "exceed retry limit"
      else
        retry
      end
    end
  end

  def abort_message(time, record)
    $stdout.puts "!#{time}:#{Yajl.dump(record)}"
  end
end


if unix
  connector = Proc.new {
    UNIXSocket.open(socket_path)
  }
else
  connector = Proc.new {
    TCPSocket.new(host, port)
  }
end

w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit, event_time: event_time)
w.start

case format
when 'json'
  begin
    while line = $stdin.gets
      record = Yajl.load(line)
      w.write(record)
    end
  rescue
    $stderr.puts $!
    exit 1
  end

when 'msgpack'
  require 'fluent/engine'

  begin
    u = Fluent::MessagePackFactory.msgpack_unpacker($stdin)
    u.each {|record|
      w.write(record)
    }
  rescue EOFError
  rescue
    $stderr.puts $!
    exit 1
  end

when 'none'
  begin
    while line = $stdin.gets
      record = { message_key => line.chomp }
      w.write(record)
    end
  rescue
    $stderr.puts $!
    exit 1
  end

else
  $stderr.puts "Unknown format '#{format}'"
  exit 1
end