lib/fluent/event.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/msgpack_factory'
require 'fluent/plugin/compressable'
module Fluent
class EventStream
include Enumerable
include Fluent::Plugin::Compressable
# dup does deep copy for event stream
def dup
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
def size
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
alias :length :size
def empty?
size == 0
end
# for tests
def ==(other)
other.is_a?(EventStream) && self.to_msgpack_stream == other.to_msgpack_stream
end
def repeatable?
false
end
def slice(index, num)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
def each(unpacker: nil, &block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
def to_msgpack_stream(time_int: false, packer: nil)
return to_msgpack_stream_forced_integer(packer: packer) if time_int
out = packer || Fluent::MessagePackFactory.msgpack_packer
each {|time,record|
out.write([time,record])
}
out.full_pack
end
def to_compressed_msgpack_stream(time_int: false, packer: nil)
packed = to_msgpack_stream(time_int: time_int, packer: packer)
compress(packed)
end
def to_msgpack_stream_forced_integer(packer: nil)
out = packer || Fluent::MessagePackFactory.msgpack_packer
each {|time,record|
out.write([time.to_i,record])
}
out.full_pack
end
end
class OneEventStream < EventStream
def initialize(time, record)
@time = time
@record = record
end
def dup
OneEventStream.new(@time, @record.dup)
end
def empty?
false
end
def size
1
end
def repeatable?
true
end
def slice(index, num)
if index > 0 || num == 0
ArrayEventStream.new([])
else
self.dup
end
end
def each(unpacker: nil, &block)
block.call(@time, @record)
nil
end
end
# EventStream from entries: Array of [time, record]
#
# Use this class for many events data with a tag
# and its representation is [ [time, record], [time, record], .. ]
class ArrayEventStream < EventStream
def initialize(entries)
@entries = entries
end
def dup
entries = @entries.map{ |time, record| [time, record.dup] }
ArrayEventStream.new(entries)
end
def size
@entries.size
end
def repeatable?
true
end
def empty?
@entries.empty?
end
def slice(index, num)
ArrayEventStream.new(@entries.slice(index, num))
end
def each(unpacker: nil, &block)
@entries.each(&block)
nil
end
end
# EventStream from entries: numbers of pairs of time and record.
#
# This class can handle many events more efficiently than ArrayEventStream
# because this class generate less objects than ArrayEventStream.
#
# Use this class as below, in loop of data-enumeration:
# 1. initialize blank stream:
# streams[tag] ||= MultiEventStream.new
# 2. add events
# stream[tag].add(time, record)
class MultiEventStream < EventStream
def initialize(time_array = [], record_array = [])
@time_array = time_array
@record_array = record_array
end
def dup
MultiEventStream.new(@time_array.dup, @record_array.map(&:dup))
end
def size
@time_array.size
end
def add(time, record)
@time_array << time
@record_array << record
end
def repeatable?
true
end
def empty?
@time_array.empty?
end
def slice(index, num)
MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
end
def each(unpacker: nil, &block)
time_array = @time_array
record_array = @record_array
for i in 0..time_array.length-1
block.call(time_array[i], record_array[i])
end
nil
end
end
class MessagePackEventStream < EventStream
# https://github.com/msgpack/msgpack-ruby/issues/119
# Keep cached_unpacker argument for existing plugins
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
@data = data
@size = size
@unpacked_times = unpacked_times
@unpacked_records = unpacked_records
end
def empty?
@data.empty?
end
def dup
if @unpacked_times
self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
else
self.class.new(@data.dup, nil, @size)
end
end
def size
# @size is unbelievable always when @size == 0
# If the number of events is really zero, unpacking events takes very short time.
ensure_unpacked! if @size == 0
@size
end
def repeatable?
true
end
def ensure_unpacked!(unpacker: nil)
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
(unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
# @size should be updated always right after unpack.
# The real size of unpacked objects are correct, rather than given size.
@size = @unpacked_times.size
end
# This method returns MultiEventStream, because there are no reason
# to surve binary serialized by msgpack.
def slice(index, num)
ensure_unpacked!
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end
def each(unpacker: nil, &block)
ensure_unpacked!(unpacker: unpacker)
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
nil
end
def to_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because @data is always packed binary in this class
@data
end
end
class CompressedMessagePackEventStream < MessagePackEventStream
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
super
@decompressed_data = nil
@compressed_data = data
end
def empty?
ensure_decompressed!
super
end
def ensure_unpacked!(unpacker: nil)
ensure_decompressed!
super
end
def each(unpacker: nil, &block)
ensure_decompressed!
super
end
def to_msgpack_stream(time_int: false, packer: nil)
ensure_decompressed!
super
end
def to_compressed_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because @data is always packed binary in this class
@compressed_data
end
private
def ensure_decompressed!
return if @decompressed_data
@data = @decompressed_data = decompress(@data)
end
end
module ChunkMessagePackEventStreamer
# chunk.extend(ChunkMessagePackEventStreamer)
# => chunk.each{|time, record| ... }
def each(unpacker: nil, &block)
# Note: If need to use `unpacker`, then implement it,
# e.g., `unpacker.feed_each(io.read, &block)` (Not tested)
raise NotImplementedError, "'unpacker' argument is not implemented." if unpacker
open do |io|
Fluent::MessagePackFactory.msgpack_unpacker(io).each(&block)
end
nil
end
alias :msgpack_each :each
def to_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because data is already packed and written in chunk
read
end
end
end