fluent/fluentd

View on GitHub
lib/fluent/plugin/buffer/chunk.rb

Summary

Maintainability
B
4 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/buffer'
require 'fluent/plugin/compressable'
require 'fluent/unique_id'
require 'fluent/event'
require 'fluent/ext_monitor_require'

require 'tempfile'
require 'zlib'

module Fluent
  module Plugin
    class Buffer # fluent/plugin/buffer is already loaded
      class Chunk
        include MonitorMixin
        include UniqueId::Mixin

        # Chunks has 2 part:
        # * metadata: contains metadata which should be restored after resume (if possible)
        #             v: {key=>value,key=>value,...} (optional)
        #             t: tag as string (optional)
        #             k: time slice key (optional)
        #
        #             id: unique_id of chunk (*)
        #             s: size (number of events in chunk) (*)
        #             c: created_at as unix time (*)
        #             m: modified_at as unix time (*)
        #              (*): fields automatically injected by chunk itself
        # * data: binary data, combined records represented as String, maybe compressed

        # NOTE: keys of metadata are named with a single letter
        #       to decread bytesize of metadata I/O

        # TODO: CompressedPackedMessage of forward protocol?

        def initialize(metadata, compress: :text)
          super()
          @unique_id = generate_unique_id
          @metadata = metadata

          # state: unstaged/staged/queued/closed
          @state = :unstaged

          @size = 0
          @created_at = Fluent::Clock.real_now
          @modified_at = Fluent::Clock.real_now

          extend Decompressable if compress == :gzip
        end

        attr_reader :unique_id, :metadata, :state

        def raw_create_at
          @created_at
        end

        def raw_modified_at
          @modified_at
        end

        # for compatibility
        def created_at
          @created_at_object ||= Time.at(@created_at)
        end

        # for compatibility
        def modified_at
          @modified_at_object ||= Time.at(@modified_at)
        end

        # data is array of formatted record string
        def append(data, **kwargs)
          raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
          adding = ''.b
          data.each do |d|
            adding << d.b
          end
          concat(adding, data.size)
        end

        # for event streams which is packed or zipped (and we want not to unpack/uncompress)
        def concat(bulk, records)
          raise NotImplementedError, "Implement this method in child class"
        end

        def commit
          raise NotImplementedError, "Implement this method in child class"
        end

        def rollback
          raise NotImplementedError, "Implement this method in child class"
        end

        def bytesize
          raise NotImplementedError, "Implement this method in child class"
        end

        def size
          raise NotImplementedError, "Implement this method in child class"
        end
        alias :length :size

        def empty?
          size == 0
        end

        def writable?
          @state == :staged || @state == :unstaged
        end

        def unstaged?
          @state == :unstaged
        end

        def staged?
          @state == :staged
        end

        def queued?
          @state == :queued
        end

        def closed?
          @state == :closed
        end

        def staged!
          @state = :staged
          self
        end

        def unstaged!
          @state = :unstaged
          self
        end

        def enqueued!
          @state = :queued
          self
        end

        def close
          @state = :closed
          self
        end

        def purge
          @state = :closed
          self
        end

        def read(**kwargs)
          raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
          raise NotImplementedError, "Implement this method in child class"
        end

        def open(**kwargs, &block)
          raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
          raise NotImplementedError, "Implement this method in child class"
        end

        def write_to(io, **kwargs)
          raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
          open do |i|
            IO.copy_stream(i, io)
          end
        end

        module Decompressable
          include Fluent::Plugin::Compressable

          def append(data, **kwargs)
            if kwargs[:compress] == :gzip
              io = StringIO.new
              Zlib::GzipWriter.wrap(io) do |gz|
                data.each do |d|
                  gz.write d
                end
              end
              concat(io.string, data.size)
            else
              super
            end
          end

          def open(**kwargs, &block)
            if kwargs[:compressed] == :gzip
              super
            else
              super(**kwargs) do |chunk_io|
                output_io = if chunk_io.is_a?(StringIO)
                              StringIO.new
                            else
                              Tempfile.new('decompressed-data')
                            end
                output_io.binmode if output_io.is_a?(Tempfile)
                decompress(input_io: chunk_io, output_io: output_io)
                output_io.seek(0, IO::SEEK_SET)
                yield output_io
              end
            end
          end

          def read(**kwargs)
            if kwargs[:compressed] == :gzip
              super
            else
              decompress(super)
            end
          end

          def write_to(io, **kwargs)
            open(compressed: :gzip) do |chunk_io|
              if kwargs[:compressed] == :gzip
                IO.copy_stream(chunk_io, io)
              else
                decompress(input_io: chunk_io, output_io: io)
              end
            end
          end
        end
      end
    end
  end
end