fluent/fluentd

View on GitHub
lib/fluent/event.rb

Summary

Maintainability
A
0 mins
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/msgpack_factory'
require 'fluent/plugin/compressable'

module Fluent
  class EventStream
    include Enumerable
    include Fluent::Plugin::Compressable

    # dup does deep copy for event stream
    def dup
      raise NotImplementedError, "DO NOT USE THIS CLASS directly."
    end

    def size
      raise NotImplementedError, "DO NOT USE THIS CLASS directly."
    end
    alias :length :size

    def empty?
      size == 0
    end

    # for tests
    def ==(other)
      other.is_a?(EventStream) && self.to_msgpack_stream == other.to_msgpack_stream
    end

    def repeatable?
      false
    end

    def slice(index, num)
      raise NotImplementedError, "DO NOT USE THIS CLASS directly."
    end

    def each(unpacker: nil, &block)
      raise NotImplementedError, "DO NOT USE THIS CLASS directly."
    end

    def to_msgpack_stream(time_int: false, packer: nil)
      return to_msgpack_stream_forced_integer(packer: packer) if time_int
      out = packer || Fluent::MessagePackFactory.msgpack_packer
      each {|time,record|
        out.write([time,record])
      }
      out.full_pack
    end

    def to_compressed_msgpack_stream(time_int: false, packer: nil)
      packed = to_msgpack_stream(time_int: time_int, packer: packer)
      compress(packed)
    end

    def to_msgpack_stream_forced_integer(packer: nil)
      out = packer || Fluent::MessagePackFactory.msgpack_packer
      each {|time,record|
        out.write([time.to_i,record])
      }
      out.full_pack
    end
  end

  class OneEventStream < EventStream
    def initialize(time, record)
      @time = time
      @record = record
    end

    def dup
      OneEventStream.new(@time, @record.dup)
    end

    def empty?
      false
    end

    def size
      1
    end

    def repeatable?
      true
    end

    def slice(index, num)
      if index > 0 || num == 0
        ArrayEventStream.new([])
      else
        self.dup
      end
    end

    def each(unpacker: nil, &block)
      block.call(@time, @record)
      nil
    end
  end

  # EventStream from entries: Array of [time, record]
  #
  # Use this class for many events data with a tag
  # and its representation is [ [time, record], [time, record], .. ]
  class ArrayEventStream < EventStream
    def initialize(entries)
      @entries = entries
    end

    def dup
      entries = @entries.map{ |time, record| [time, record.dup] }
      ArrayEventStream.new(entries)
    end

    def size
      @entries.size
    end

    def repeatable?
      true
    end

    def empty?
      @entries.empty?
    end

    def slice(index, num)
      ArrayEventStream.new(@entries.slice(index, num))
    end

    def each(unpacker: nil, &block)
      @entries.each(&block)
      nil
    end
  end

  # EventStream from entries: numbers of pairs of time and record.
  #
  # This class can handle many events more efficiently than ArrayEventStream
  # because this class generate less objects than ArrayEventStream.
  #
  # Use this class as below, in loop of data-enumeration:
  #  1. initialize blank stream:
  #     streams[tag] ||= MultiEventStream.new
  #  2. add events
  #     stream[tag].add(time, record)
  class MultiEventStream < EventStream
    def initialize(time_array = [], record_array = [])
      @time_array = time_array
      @record_array = record_array
    end

    def dup
      MultiEventStream.new(@time_array.dup, @record_array.map(&:dup))
    end

    def size
      @time_array.size
    end

    def add(time, record)
      @time_array << time
      @record_array << record
    end

    def repeatable?
      true
    end

    def empty?
      @time_array.empty?
    end

    def slice(index, num)
      MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
    end

    def each(unpacker: nil, &block)
      time_array = @time_array
      record_array = @record_array
      for i in 0..time_array.length-1
        block.call(time_array[i], record_array[i])
      end
      nil
    end
  end

  class MessagePackEventStream < EventStream
    # https://github.com/msgpack/msgpack-ruby/issues/119

    # Keep cached_unpacker argument for existing plugins
    def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
      @data = data
      @size = size
      @unpacked_times = unpacked_times
      @unpacked_records = unpacked_records
    end

    def empty?
      @data.empty?
    end

    def dup
      if @unpacked_times
        self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
      else
        self.class.new(@data.dup, nil, @size)
      end
    end

    def size
      # @size is unbelievable always when @size == 0
      # If the number of events is really zero, unpacking events takes very short time.
      ensure_unpacked! if @size == 0
      @size
    end

    def repeatable?
      true
    end

    def ensure_unpacked!(unpacker: nil)
      return if @unpacked_times && @unpacked_records
      @unpacked_times = []
      @unpacked_records = []
      (unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record|
        @unpacked_times << time
        @unpacked_records << record
      end
      # @size should be updated always right after unpack.
      # The real size of unpacked objects are correct, rather than given size.
      @size = @unpacked_times.size
    end

    # This method returns MultiEventStream, because there are no reason
    # to surve binary serialized by msgpack.
    def slice(index, num)
      ensure_unpacked!
      MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
    end

    def each(unpacker: nil, &block)
      ensure_unpacked!(unpacker: unpacker)
      @unpacked_times.each_with_index do |time, i|
        block.call(time, @unpacked_records[i])
      end
      nil
    end

    def to_msgpack_stream(time_int: false, packer: nil)
      # time_int is always ignored because @data is always packed binary in this class
      @data
    end
  end

  class CompressedMessagePackEventStream < MessagePackEventStream
    def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
      super
      @decompressed_data = nil
      @compressed_data = data
    end

    def empty?
      ensure_decompressed!
      super
    end

    def ensure_unpacked!(unpacker: nil)
      ensure_decompressed!
      super
    end

    def each(unpacker: nil, &block)
      ensure_decompressed!
      super
    end

    def to_msgpack_stream(time_int: false, packer: nil)
      ensure_decompressed!
      super
    end

    def to_compressed_msgpack_stream(time_int: false, packer: nil)
      # time_int is always ignored because @data is always packed binary in this class
      @compressed_data
    end

    private

    def ensure_decompressed!
      return if @decompressed_data
      @data = @decompressed_data = decompress(@data)
    end
  end

  module ChunkMessagePackEventStreamer
    # chunk.extend(ChunkMessagePackEventStreamer)
    #  => chunk.each{|time, record| ... }
    def each(unpacker: nil, &block)
      # Note: If need to use `unpacker`, then implement it,
      # e.g., `unpacker.feed_each(io.read, &block)` (Not tested)
      raise NotImplementedError, "'unpacker' argument is not implemented." if unpacker

      open do |io|
        Fluent::MessagePackFactory.msgpack_unpacker(io).each(&block)
      end
      nil
    end
    alias :msgpack_each :each

    def to_msgpack_stream(time_int: false, packer: nil)
      # time_int is always ignored because data is already packed and written in chunk
      read
    end
  end
end