lib/fluent/plugin/out_s3.rb
require 'fluent/plugin/output'
require 'fluent/log-ext'
require 'fluent/timezone'
require 'aws-sdk-s3'
require 'zlib'
require 'time'
require 'tempfile'
require 'securerandom'
module Fluent::Plugin
class S3Output < Output
Fluent::Plugin.register_output('s3', self)
helpers :compat_parameters, :formatter, :inject
def initialize
super
@compressor = nil
@uuid_flush_enabled = false
end
desc "Path prefix of the files on S3"
config_param :path, :string, default: ""
desc "The Server-side encryption algorithm used when storing this object in S3 (AES256, aws:kms)"
config_param :use_server_side_encryption, :string, default: nil
desc "Use aws-sdk-ruby bundled cert"
config_param :use_bundled_cert, :bool, default: false
desc "AWS access key id"
config_param :aws_key_id, :string, default: nil, secret: true
desc "AWS secret key."
config_param :aws_sec_key, :string, default: nil, secret: true
config_section :assume_role_credentials, multi: false do
desc "The Amazon Resource Name (ARN) of the role to assume"
config_param :role_arn, :string, secret: true
desc "An identifier for the assumed role session"
config_param :role_session_name, :string
desc "An IAM policy in JSON format"
config_param :policy, :string, default: nil
desc "The duration, in seconds, of the role session (900-3600)"
config_param :duration_seconds, :integer, default: nil
desc "A unique identifier that is used by third parties when assuming roles in their customers' accounts."
config_param :external_id, :string, default: nil, secret: true
desc "The region of the STS endpoint to use."
config_param :sts_region, :string, default: nil
desc "A http proxy url for requests to aws sts service"
config_param :sts_http_proxy, :string, default: nil, secret: true
desc "A url for a regional sts api endpoint, the default is global"
config_param :sts_endpoint_url, :string, default: nil
end
# See the following link for additional params that could be added:
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/STS/Client.html#assume_role_with_web_identity-instance_method
config_section :web_identity_credentials, multi: false do
desc "The Amazon Resource Name (ARN) of the role to assume"
config_param :role_arn, :string # required
desc "An identifier for the assumed role session"
config_param :role_session_name, :string #required
desc "The absolute path to the file on disk containing the OIDC token"
config_param :web_identity_token_file, :string #required
desc "An IAM policy in JSON format"
config_param :policy, :string, default: nil
desc "The duration, in seconds, of the role session (900-43200)"
config_param :duration_seconds, :integer, default: nil
desc "The region of the STS endpoint to use."
config_param :sts_region, :string, default: nil
end
config_section :instance_profile_credentials, multi: false do
desc "Number of times to retry when retrieving credentials"
config_param :retries, :integer, default: nil
desc "IP address (default:169.254.169.254)"
config_param :ip_address, :string, default: nil
desc "Port number (default:80)"
config_param :port, :integer, default: nil
desc "Number of seconds to wait for the connection to open"
config_param :http_open_timeout, :float, default: nil
desc "Number of seconds to wait for one block to be read"
config_param :http_read_timeout, :float, default: nil
# config_param :delay, :integer or :proc, :default => nil
# config_param :http_degub_output, :io, :default => nil
end
config_section :shared_credentials, multi: false do
desc "Path to the shared file. (default: $HOME/.aws/credentials)"
config_param :path, :string, default: nil
desc "Profile name. Default to 'default' or ENV['AWS_PROFILE']"
config_param :profile_name, :string, default: nil
end
desc "The number of attempts to load instance profile credentials from the EC2 metadata service using IAM role"
config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
desc "S3 bucket name"
config_param :s3_bucket, :string
desc "S3 region name"
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
desc "Use 's3_region' instead"
config_param :s3_endpoint, :string, default: nil
desc "If true, S3 Transfer Acceleration will be enabled for uploads. IMPORTANT: You must first enable this feature on your destination S3 bucket"
config_param :enable_transfer_acceleration, :bool, default: false
desc "If true, use Amazon S3 Dual-Stack Endpoints. Will make it possible to use either IPv4 or IPv6 when connecting to S3."
config_param :enable_dual_stack, :bool, default: false
desc "If false, the certificate of endpoint will not be verified"
config_param :ssl_verify_peer, :bool, :default => true
desc "Full path to the SSL certificate authority bundle file that should be used when verifying peer certificates. If unspecified, defaults to the system CA if available."
config_param :ssl_ca_bundle, :string, :default => nil
desc "Full path of the directory that contains the unbundled SSL certificate authority files for verifying peer certificates. If you do not pass ssl_ca_bundle or ssl_ca_directory the the system default will be used if available."
config_param :ssl_ca_directory, :string, :default => nil
desc "The format of S3 object keys"
config_param :s3_object_key_format, :string, default: "%{path}%{time_slice}_%{index}.%{file_extension}"
desc "If true, the bucket name is always left in the request URI and never moved to the host as a sub-domain"
config_param :force_path_style, :bool, default: false, deprecated: "S3 will drop path style API in 2020: See https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/"
desc "Archive format on S3"
config_param :store_as, :string, default: "gzip"
desc "Create S3 bucket if it does not exists"
config_param :auto_create_bucket, :bool, default: true
desc "Check AWS key on start"
config_param :check_apikey_on_start, :bool, default: true
desc "URI of proxy environment"
config_param :proxy_uri, :string, default: nil
desc "Use S3 reduced redundancy storage for 33% cheaper pricing. Deprecated. Use storage_class instead"
config_param :reduced_redundancy, :bool, default: false, deprecated: "Use storage_class parameter instead."
desc "The type of storage to use for the object(STANDARD,REDUCED_REDUNDANCY,STANDARD_IA)"
config_param :storage_class, :string, default: "STANDARD"
desc "Permission for the object in S3"
config_param :acl, :string, default: nil
desc "Allows grantee READ, READ_ACP, and WRITE_ACP permissions on the object"
config_param :grant_full_control, :string, default: nil
desc "Allows grantee to read the object data and its metadata"
config_param :grant_read, :string, default: nil
desc "Allows grantee to read the object ACL"
config_param :grant_read_acp, :string, default: nil
desc "Allows grantee to write the ACL for the applicable object"
config_param :grant_write_acp, :string, default: nil
desc "The length of `%{hex_random}` placeholder(4-16)"
config_param :hex_random_length, :integer, default: 4
desc "`sprintf` format for `%{index}`"
config_param :index_format, :string, default: "%d"
desc "Overwrite already existing path"
config_param :overwrite, :bool, default: false
desc "Check bucket if exists or not"
config_param :check_bucket, :bool, default: true
desc "Check object before creation"
config_param :check_object, :bool, default: true
desc "Specifies the AWS KMS key ID to use for object encryption"
config_param :ssekms_key_id, :string, default: nil, secret: true
desc "Specifies the algorithm to use to when encrypting the object"
config_param :sse_customer_algorithm, :string, default: nil
desc "Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data"
config_param :sse_customer_key, :string, default: nil, secret: true
desc "Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321"
config_param :sse_customer_key_md5, :string, default: nil, secret: true
desc "AWS SDK uses MD5 for API request/response by default"
config_param :compute_checksums, :bool, default: nil # use nil to follow SDK default configuration
desc "Signature version for API Request (s3,v4)"
config_param :signature_version, :string, default: nil # use nil to follow SDK default configuration
desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3"
config_param :warn_for_delay, :time, default: nil
desc "Arbitrary S3 tag-set for the object"
config_param :tagging, :string, default: nil
desc "Arbitrary S3 metadata headers to set for the object"
config_param :s3_metadata, :hash, default: nil
config_section :bucket_lifecycle_rule, param_name: :bucket_lifecycle_rules, multi: true do
desc "A unique ID for this rule"
config_param :id, :string
desc "Objects whose keys begin with this prefix will be affected by the rule. If not specified all objects of the bucket will be affected"
config_param :prefix, :string, default: ''
desc "The number of days before the object will expire"
config_param :expiration_days, :integer
end
DEFAULT_FORMAT_TYPE = "out_file"
config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end
config_section :buffer do
config_set_default :chunk_keys, ['time']
config_set_default :timekey, (60 * 60 * 24)
end
attr_reader :bucket
MAX_HEX_RANDOM_LENGTH = 16
def reject_s3_endpoint?
@s3_endpoint && !@s3_endpoint.end_with?('vpce.amazonaws.com') &&
@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) }
end
def configure(conf)
compat_parameters_convert(conf, :buffer, :formatter, :inject)
super
Aws.use_bundled_cert! if @use_bundled_cert
if reject_s3_endpoint?
raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
buffer_type = @buffer_config[:@type]
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
rescue => e
log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
@formatter = formatter_create
if @hex_random_length > MAX_HEX_RANDOM_LENGTH
raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
end
unless @index_format =~ /^%(0\d*)?[dxX]$/
raise Fluent::ConfigError, "index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types"
end
if @reduced_redundancy
log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
@storage_class = "REDUCED_REDUNDANCY"
end
@s3_object_key_format = process_s3_object_key_format
if !@check_object
if conf.has_key?('s3_object_key_format')
log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten."
else
log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{time_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format"
@s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}"
end
end
check_s3_path_safety(conf)
# For backward compatibility
# TODO: Remove time_slice_format when end of support compat_parameters
@configured_time_slice_format = conf['time_slice_format']
@values_for_s3_object_chunk = {}
@time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end
def multi_workers_ready?
true
end
def start
options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:use_accelerate_endpoint] = @enable_transfer_acceleration
options[:use_dualstack_endpoint] = @enable_dual_stack
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
options[:signature_version] = @signature_version unless @signature_version.nil?
options[:ssl_verify_peer] = @ssl_verify_peer
options[:ssl_ca_bundle] = @ssl_ca_bundle if @ssl_ca_bundle
options[:ssl_ca_directory] = @ssl_ca_directory if @ssl_ca_directory
log.on_trace do
options[:http_wire_trace] = true
options[:logger] = log
end
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
ensure_bucket_lifecycle
super
end
def format(tag, time, record)
r = inject_values_to_record(tag, time, record)
@formatter.format(tag, time, r)
end
def write(chunk)
i = 0
metadata = chunk.metadata
previous_path = nil
time_slice = if metadata.timekey.nil?
''.freeze
else
@time_slice_with_tz.call(metadata.timekey)
end
if @check_object
begin
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"%{hex_random}" => hex_random(chunk),
}
values_for_s3_object_key_pre = {
"%{path}" => @path,
"%{file_extension}" => @compressor.ext,
}
values_for_s3_object_key_post = {
"%{time_slice}" => time_slice,
"%{index}" => sprintf(@index_format,i),
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
values_for_s3_object_key_pre.fetch(matched_key, matched_key)
end
s3path = extract_placeholders(s3path, chunk)
s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
else
if @localtime
hms_slicer = Time.now.strftime("%H%M%S")
else
hms_slicer = Time.now.utc.strftime("%H%M%S")
end
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"%{hex_random}" => hex_random(chunk),
}
values_for_s3_object_key_pre = {
"%{path}" => @path,
"%{file_extension}" => @compressor.ext,
}
values_for_s3_object_key_post = {
"%{date_slice}" => time_slice, # For backward compatibility
"%{time_slice}" => time_slice,
"%{hms_slice}" => hms_slicer,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
values_for_s3_object_key_pre.fetch(matched_key, matched_key)
end
s3path = extract_placeholders(s3path, chunk)
s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
end
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}"
put_options = {
body: tmp,
content_type: @compressor.content_type,
storage_class: @storage_class,
}
put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
put_options[:acl] = @acl if @acl
put_options[:grant_full_control] = @grant_full_control if @grant_full_control
put_options[:grant_read] = @grant_read if @grant_read
put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp
put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp
put_options[:tagging] = @tagging if @tagging
if @s3_metadata
put_options[:metadata] = {}
@s3_metadata.each do |k, v|
put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
end
end
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
if @warn_for_delay
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
end
end
ensure
tmp.close(true) rescue nil
end
end
private
def hex_random(chunk)
unique_hex = Fluent::UniqueId.hex(chunk.unique_id)
unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness
unique_hex[0...@hex_random_length]
end
def uuid_random
SecureRandom.uuid
end
# This is stolen from Fluentd
def timekey_to_timeformat(timekey)
case timekey
when nil then ''
when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive
when 60...3600 then '%Y%m%d%H%M'
when 3600...86400 then '%Y%m%d%H'
else '%Y%m%d'
end
end
def ensure_bucket
if !@bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
@s3.create_bucket(bucket: @s3_bucket)
else
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
end
end
end
def ensure_bucket_lifecycle
unless @bucket_lifecycle_rules.empty?
old_rules = get_bucket_lifecycle_rules
new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
{ id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
end
unless old_rules == new_rules
log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
@bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
end
end
end
def get_bucket_lifecycle_rules
begin
@bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
{ id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
end
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
[]
end
end
def process_s3_object_key_format
%W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph|
if @s3_object_key_format.include?(ph)
raise Fluent::ConfigError, %!#{ph} placeholder in s3_object_key_format is removed!
end
}
if @s3_object_key_format.include?('%{uuid_flush}')
@uuid_flush_enabled = true
end
@s3_object_key_format.gsub('%{hostname}') { |expr|
log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead"
Socket.gethostname
}
end
def check_s3_path_safety(conf)
unless conf.has_key?('s3_object_key_format')
log.warn "The default value of s3_object_key_format will use ${chunk_id} instead of %{index} to avoid object conflict in v2"
end
is_working_on_parallel = @buffer_config.flush_thread_count > 1 || system_config.workers > 1
if is_working_on_parallel && ['${chunk_id}', '%{uuid_flush}', '%{hex_random}'].none? { |key| @s3_object_key_format.include?(key) }
log.warn "No ${chunk_id}, %{uuid_flush} or %{hex_random} in s3_object_key_format with multiple flush threads or multiple workers. Recommend to set ${chunk_id}, %{uuid_flush} or %{hex_random} to avoid data lost by object conflict"
end
end
def check_apikeys
@bucket.objects(prefix: @path, :max_keys => 1).first
rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
end
def setup_credentials
options = {}
credentials_options = {}
case
when @assume_role_credentials
c = @assume_role_credentials
iam_user_credentials = @aws_key_id && @aws_sec_key ? Aws::Credentials.new(@aws_key_id, @aws_sec_key) : nil
region = c.sts_region || @s3_region
credentials_options[:role_arn] = c.role_arn
credentials_options[:role_session_name] = c.role_session_name
credentials_options[:policy] = c.policy if c.policy
credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
credentials_options[:external_id] = c.external_id if c.external_id
credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url
credentials_options[:sts_http_proxy] = c.sts_http_proxy if c.sts_http_proxy
if c.sts_http_proxy && c.sts_endpoint_url
credentials_options[:client] = if iam_user_credentials
Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url, credentials: iam_user_credentials)
else
Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url)
end
elsif c.sts_http_proxy
credentials_options[:client] = if iam_user_credentials
Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, credentials: iam_user_credentials)
else
Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy)
end
elsif c.sts_endpoint_url
credentials_options[:client] = if iam_user_credentials
Aws::STS::Client.new(region: region, endpoint: c.sts_endpoint_url, credentials: iam_user_credentials)
else
Aws::STS::Client.new(region: region, endpoint: c.sts_endpoint_url)
end
else
credentials_options[:client] = if iam_user_credentials
Aws::STS::Client.new(region: region, credentials: iam_user_credentials)
else
Aws::STS::Client.new(region: region)
end
end
options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
when @aws_key_id && @aws_sec_key
options[:access_key_id] = @aws_key_id
options[:secret_access_key] = @aws_sec_key
when @web_identity_credentials
c = @web_identity_credentials
credentials_options[:role_arn] = c.role_arn
credentials_options[:role_session_name] = c.role_session_name
credentials_options[:web_identity_token_file] = c.web_identity_token_file
credentials_options[:policy] = c.policy if c.policy
credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
if c.sts_region
credentials_options[:client] = Aws::STS::Client.new(:region => c.sts_region)
elsif @s3_region
credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region)
end
options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options)
when @instance_profile_credentials
c = @instance_profile_credentials
credentials_options[:retries] = c.retries if c.retries
credentials_options[:ip_address] = c.ip_address if c.ip_address
credentials_options[:port] = c.port if c.port
credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
options[:credentials] = Aws::ECSCredentials.new(credentials_options)
else
options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
end
when @shared_credentials
c = @shared_credentials
credentials_options[:path] = c.path if c.path
credentials_options[:profile_name] = c.profile_name if c.profile_name
options[:credentials] = Aws::SharedCredentials.new(credentials_options)
when @aws_iam_retries
log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
credentials_options[:retries] = @aws_iam_retries
if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
options[:credentials] = Aws::ECSCredentials.new(credentials_options)
else
options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
end
else
# Use default credentials
# See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
end
options
end
class Compressor
include Fluent::Configurable
def initialize(opts = {})
super()
@buffer_type = opts[:buffer_type]
@log = opts[:log]
end
attr_reader :buffer_type, :log
def configure(conf)
super
end
def ext
end
def content_type
end
def compress(chunk, tmp)
end
private
def check_command(command, algo = nil)
require 'open3'
algo = command if algo.nil?
begin
Open3.capture3("#{command} -V")
rescue Errno::ENOENT
raise Fluent::ConfigError, "'#{command}' utility must be in PATH for #{algo} compression"
end
end
end
class GzipCompressor < Compressor
def ext
'gz'.freeze
end
def content_type
'application/x-gzip'.freeze
end
def compress(chunk, tmp)
w = Zlib::GzipWriter.new(tmp)
chunk.write_to(w)
w.finish
ensure
w.finish rescue nil
end
end
class TextCompressor < Compressor
def ext
'txt'.freeze
end
def content_type
'text/plain'.freeze
end
def compress(chunk, tmp)
chunk.write_to(tmp)
end
end
class JsonCompressor < TextCompressor
def ext
'json'.freeze
end
def content_type
'application/json'.freeze
end
end
COMPRESSOR_REGISTRY = Fluent::Registry.new(:s3_compressor_type, 'fluent/plugin/s3_compressor_')
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor
}.each { |name, compressor|
COMPRESSOR_REGISTRY.register(name, compressor)
}
def self.register_compressor(name, compressor)
COMPRESSOR_REGISTRY.register(name, compressor)
end
end
end