lib/fluent/command/binlog_reader.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'optparse'
require 'msgpack'
require 'fluent/msgpack_factory'
require 'fluent/formatter'
require 'fluent/plugin'
require 'fluent/config/element'
require 'fluent/engine'
require 'fluent/version'
class FluentBinlogReader
SUBCOMMAND = %w(cat head formats)
HELP_TEXT = <<HELP
Usage: fluent-binlog-reader <command> [<args>]
Commands of fluent-binlog-reader:
cat : Read files sequentially, writing them to standard output.
head : Display the beginning of a text file.
formats : Display plugins that you can use.
See 'fluent-binlog-reader <command> --help' for more information on a specific command.
HELP
def initialize(argv = ARGV)
@argv = argv
end
def call
command_class = BinlogReaderCommand.const_get(command)
command_class.new(@argv).call
end
private
def command
command = @argv.shift
if command
if command == '--version'
puts "#{File.basename($PROGRAM_NAME)} #{Fluent::VERSION}"
exit 0
elsif !SUBCOMMAND.include?(command)
usage "'#{command}' is not supported: Required subcommand : #{SUBCOMMAND.join(' | ')}"
end
else
usage "Required subcommand : #{SUBCOMMAND.join(' | ')}"
end
command.split('_').map(&:capitalize).join('')
end
def usage(msg = nil)
puts HELP_TEXT
puts "Error: #{msg}" if msg
exit 1
end
end
module BinlogReaderCommand
class Base
def initialize(argv = ARGV)
@argv = argv
@options = { plugin: [] }
@opt_parser = OptionParser.new do |opt|
opt.version = Fluent::VERSION
opt.separator 'Options:'
opt.on('-p DIR', '--plugin', 'add library directory path') do |v|
@options[:plugin] << v
end
end
end
def call
raise NotImplementedError, 'BUG: command MUST implement this method'
end
private
def usage(msg = nil)
puts @opt_parser.to_s
puts "Error: #{msg}" if msg
exit 1
end
def parse_options!
@opt_parser.parse!(@argv)
unless @options[:plugin].empty?
if dir = @options[:plugin].find { |d| !Dir.exist?(d) }
usage "Directory #{dir} doesn't exist"
else
@options[:plugin].each do |d|
Fluent::Plugin.add_plugin_dir(d)
end
end
end
rescue => e
usage e
end
end
module Formattable
DEFAULT_OPTIONS = {
format: :out_file
}
def initialize(argv = ARGV)
super
@options.merge!(DEFAULT_OPTIONS)
configure_option_parser
end
private
def configure_option_parser
@options[:config_params] = {}
@opt_parser.banner = "Usage: fluent-binlog-reader #{self.class.to_s.split('::').last.downcase} [options] file"
@opt_parser.on('-f TYPE', '--format', 'configure output format') do |v|
@options[:format] = v.to_sym
end
@opt_parser.on('-e KEY=VALUE', 'configure formatter config params') do |v|
key, value = v.split('=')
usage "#{v} is invalid. valid format is like `key=value`" unless value
@options[:config_params].merge!(key => value)
end
end
def lookup_formatter(format, params)
conf = Fluent::Config::Element.new('ROOT', '', params, [])
formatter = Fluent::Plugin.new_formatter(format)
if formatter.respond_to?(:configure)
formatter.configure(conf)
end
formatter
rescue => e
usage e
end
end
class Head < Base
include Formattable
DEFAULT_HEAD_OPTIONS = {
count: 5
}
def initialize(argv = ARGV)
super
@options.merge!(default_options)
parse_options!
end
def call
@formatter = lookup_formatter(@options[:format], @options[:config_params])
File.open(@path, 'rb') do |io|
i = 1
Fluent::MessagePackFactory.unpacker(io).each do |(time, record)|
print @formatter.format(@path, time, record) # path is used for tag
break if @options[:count] && i == @options[:count]
i += 1
end
end
end
private
def default_options
DEFAULT_HEAD_OPTIONS
end
def parse_options!
@opt_parser.on('-n COUNT', 'Set the number of lines to display') do |v|
@options[:count] = v.to_i
usage "illegal line count -- #{@options[:count]}" if @options[:count] < 1
end
super
usage 'Path is required' if @argv.empty?
@path = @argv.first
usage "#{@path} is not found" unless File.exist?(@path)
end
end
class Cat < Head
DEFAULT_CAT_OPTIONS = {
count: nil # Overwrite DEFAULT_HEAD_OPTIONS[:count]
}
def default_options
DEFAULT_CAT_OPTIONS
end
end
class Formats < Base
def initialize(argv = ARGV)
super
parse_options!
end
def call
prefix = Fluent::Plugin::FORMATTER_REGISTRY.dir_search_prefix || 'formatter_'
plugin_dirs = @options[:plugin]
unless plugin_dirs.empty?
plugin_dirs.each do |d|
Dir.glob("#{d}/#{prefix}*.rb").each do |path|
require File.absolute_path(path)
end
end
end
$LOAD_PATH.map do |lp|
Dir.glob("#{lp}/#{prefix}*.rb").each do |path|
require path
end
end
puts Fluent::Plugin::FORMATTER_REGISTRY.map.keys
end
end
end