fluent/fluentd

View on GitHub
lib/fluent/plugin/buffer/file_chunk.rb

Summary

Maintainability
A
2 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/buffer/chunk'
require 'fluent/unique_id'
require 'fluent/msgpack_factory'

module Fluent
  module Plugin
    class Buffer
      class FileChunk < Chunk
        class FileChunkError < StandardError; end

        BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze

        ### buffer path user specified : /path/to/directory/user_specified_prefix.*.log
        ### buffer chunk path          : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log
        ### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta

        # NOTE: Old style buffer path of time sliced output plugins had a part of key: prefix.20150414.b513b61...suffix
        #       But this part is not used now for any purpose. (Now metadata is used instead.)

        # state: b/q - 'b'(on stage, compatible with v0.12), 'q'(enqueued)
        # path_prefix: path prefix string, ended with '.'
        # path_suffix: path suffix string, like '.log' (or any other user specified)

        attr_reader :path, :permission

        def initialize(metadata, path, mode, perm: nil, compress: :text)
          super(metadata, compress: compress)
          perm ||= Fluent::DEFAULT_FILE_PERMISSION
          @permission = perm.is_a?(String) ? perm.to_i(8) : perm
          @bytesize = @size = @adding_bytes = @adding_size = 0
          @meta = nil

          case mode
          when :create then create_new_chunk(path, @permission)
          when :staged then load_existing_staged_chunk(path)
          when :queued then load_existing_enqueued_chunk(path)
          else
            raise ArgumentError, "Invalid file chunk mode: #{mode}"
          end
        end

        def concat(bulk, bulk_size)
          raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable?

          bulk.force_encoding(Encoding::ASCII_8BIT)
          @chunk.write bulk
          @adding_bytes += bulk.bytesize
          @adding_size += bulk_size
          true
        end

        def commit
          write_metadata # this should be at first: of course, this operation may fail

          @commit_position = @chunk.pos
          @size += @adding_size
          @bytesize += @adding_bytes
          @adding_bytes = @adding_size = 0
          @modified_at = Fluent::Clock.real_now
          @modified_at_object = nil

          true
        end

        def rollback
          if @chunk.pos != @commit_position
            @chunk.seek(@commit_position, IO::SEEK_SET)
            @chunk.truncate(@commit_position)
          end
          @adding_bytes = @adding_size = 0
          true
        end

        def bytesize
          @bytesize + @adding_bytes
        end

        def size
          @size + @adding_size
        end

        def empty?
          @bytesize == 0
        end

        def enqueued!
          return unless self.staged?

          new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id)
          new_meta_path = new_chunk_path + '.meta'

          write_metadata(update: false) # re-write metadata w/ finalized records

          begin
            file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io })
          rescue => e
            begin
              file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
            rescue => re
              # In this point, restore buffer state is hard because previous `file_rename` failed by resource problem.
              # Retry is one possible approach but it may cause livelock under limited resources or high load environment.
              # So we ignore such errors for now and log better message instead.
              # "Too many open files" should be fixed by proper buffer configuration and system setting.
              raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'"
            else
              raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'"
            end
          end

          begin
            file_rename(@meta, @meta_path, new_meta_path, ->(new_io) { @meta = new_io })
          rescue => e
            begin
              file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
              file_rename(@meta, new_meta_path, @meta_path, ->(new_io) { @meta = new_io }) if File.exist?(new_meta_path)
            rescue => re
              # See above
              raise "can't enqueue buffer metadata and failed to restore. This may causes inconsistent state: path = #{@meta_path}, error = '#{e}', retry error = '#{re}'"
            else
              raise "can't enqueue buffer metadata: path = #{@meta_path}, error = '#{e}'"
            end
          end

          @path = new_chunk_path
          @meta_path = new_meta_path

          super
        end

        def close
          super
          size = @chunk.size
          @chunk.close
          @meta.close if @meta # meta may be missing if chunk is queued at first
          if size == 0
            File.unlink(@path, @meta_path)
          end
        end

        def purge
          super
          @chunk.close
          @meta.close if @meta
          @bytesize = @size = @adding_bytes = @adding_size = 0
          File.unlink(@path, @meta_path)
        end

        def read(**kwargs)
          @chunk.seek(0, IO::SEEK_SET)
          @chunk.read
        end

        def open(**kwargs, &block)
          @chunk.seek(0, IO::SEEK_SET)
          val = yield @chunk
          @chunk.seek(0, IO::SEEK_END) if self.staged?
          val
        end

        def self.assume_chunk_state(path)
          if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
            $1 == 'b' ? :staged : :queued
          else
            # files which matches to glob of buffer file pattern
            # it includes files which are created by out_file
            :unknown
          end
        end

        def self.generate_stage_chunk_path(path, unique_id)
          pos = path.index('.*.')
          raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos

          prefix = path[0...pos]
          suffix = path[(pos+3)..-1]

          chunk_id = Fluent::UniqueId.hex(unique_id)
          state = 'b'
          "#{prefix}.#{state}#{chunk_id}.#{suffix}"
        end

        def self.generate_queued_chunk_path(path, unique_id)
          chunk_id = Fluent::UniqueId.hex(unique_id)
          if path.index(".b#{chunk_id}.")
            path.sub(".b#{chunk_id}.", ".q#{chunk_id}.")
          else # for unexpected cases (ex: users rename files while opened by fluentd)
            path + ".q#{chunk_id}.chunk"
          end
        end

        # used only for queued v0.12 buffer path or broken files
        def self.unique_id_from_path(path)
          if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
            return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*')
          end
          nil
        end

        def restore_metadata(bindata)
          data = restore_metadata_with_new_format(bindata)

          unless data
            # old type of restore
            data = Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}
          end

          now = Fluent::Clock.real_now

          @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id
          @size = data[:s] || 0
          @created_at = data.fetch(:c, now.to_i)
          @modified_at = data.fetch(:m, now.to_i)

          @metadata.timekey = data[:timekey]
          @metadata.tag = data[:tag]
          @metadata.variables = data[:variables]
          @metadata.seq = data[:seq] || 0
        end

        def restore_metadata_partially(chunk)
          @unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id
          @size = 0
          @created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?)
          @modified_at = chunk.mtime.to_i

          @metadata.timekey = nil
          @metadata.tag = nil
          @metadata.variables = nil
          @metadata.seq = 0
        end

        def write_metadata(update: true)
          data = @metadata.to_h.merge({
              id: @unique_id,
              s: (update ? @size + @adding_size : @size),
              c: @created_at,
              m: (update ? Fluent::Clock.real_now : @modified_at),
          })
          bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack
          size = [bin.bytesize].pack('N')
          @meta.seek(0, IO::SEEK_SET)
          @meta.write(BUFFER_HEADER + size + bin)
        end

        def file_rename(file, old_path, new_path, callback=nil)
          pos = file.pos
          if Fluent.windows?
            file.close
            File.rename(old_path, new_path)
            file = File.open(new_path, 'rb', @permission)
          else
            File.rename(old_path, new_path)
            file.reopen(new_path, 'rb')
          end
          file.set_encoding(Encoding::ASCII_8BIT)
          file.sync = true
          file.binmode
          file.pos = pos
          callback.call(file) if callback
        end

        def create_new_chunk(path, perm)
          @path = self.class.generate_stage_chunk_path(path, @unique_id)
          @meta_path = @path + '.meta'
          begin
            @chunk = File.open(@path, 'wb+', perm)
            @chunk.set_encoding(Encoding::ASCII_8BIT)
            @chunk.sync = true
            @chunk.binmode
          rescue => e
            # Here assumes "Too many open files" like recoverable error so raising BufferOverflowError.
            # If other cases are possible, we will change erorr handling with proper classes.
            raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}"
          end
          begin
            @meta = File.open(@meta_path, 'wb', perm)
            @meta.set_encoding(Encoding::ASCII_8BIT)
            @meta.sync = true
            @meta.binmode
            write_metadata(update: false)
          rescue => e
            # This case is easier than enqueued!. Just removing pre-create buffer file
            @chunk.close rescue nil
            File.unlink(@path) rescue nil

            if @meta
              # ensure to unlink when #write_metadata fails
              @meta.close rescue nil
              File.unlink(@meta_path) rescue nil
            end

            # Same as @chunk case. See above
            raise BufferOverflowError, "can't create buffer metadata for #{path}. Stop creating buffer files: error = #{e}"
          end

          @state = :unstaged
          @bytesize = 0
          @commit_position = @chunk.pos # must be 0
          @adding_bytes = 0
          @adding_size = 0
        end

        def load_existing_staged_chunk(path)
          @path = path
          @meta_path = @path + '.meta'

          @meta = nil
          # staging buffer chunk without metadata is classic buffer chunk file
          # and it should be enqueued immediately
          if File.exist?(@meta_path)
            raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

            @chunk = File.open(@path, 'rb+')
            @chunk.set_encoding(Encoding::ASCII_8BIT)
            @chunk.sync = true
            @chunk.seek(0, IO::SEEK_END)
            @chunk.binmode

            @meta = File.open(@meta_path, 'rb+')
            @meta.set_encoding(Encoding::ASCII_8BIT)
            @meta.sync = true
            @meta.binmode
            begin
              restore_metadata(@meta.read)
            rescue => e
              @chunk.close
              @meta.close
              raise FileChunkError, "staged meta file is broken. #{e.message}"
            end
            @meta.seek(0, IO::SEEK_SET)

            @state = :staged
            @bytesize = @chunk.size
            @commit_position = @chunk.pos
            @adding_bytes = 0
            @adding_size = 0
          else
            # classic buffer chunk - read only chunk
            @chunk = File.open(@path, 'rb')
            @chunk.set_encoding(Encoding::ASCII_8BIT)
            @chunk.binmode
            @chunk.seek(0, IO::SEEK_SET)
            @state = :queued
            @bytesize = @chunk.size

            restore_metadata_partially(@chunk)

            @commit_position = @chunk.size
            @unique_id = self.class.unique_id_from_path(@path) || @unique_id
          end
        end

        def load_existing_enqueued_chunk(path)
          @path = path
          raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

          @chunk = File.open(@path, 'rb')
          @chunk.set_encoding(Encoding::ASCII_8BIT)
          @chunk.binmode
          @chunk.seek(0, IO::SEEK_SET)
          @bytesize = @chunk.size
          @commit_position = @chunk.size

          @meta_path = @path + '.meta'
          if File.readable?(@meta_path)
            begin
              restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
            rescue => e
              @chunk.close
              raise FileChunkError, "enqueued meta file is broken. #{e.message}"
            end
          else
            restore_metadata_partially(@chunk)
          end
          @state = :queued
        end

        private

        def restore_metadata_with_new_format(chunk)
          if chunk.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4)
            return nil
          end

          if chunk.slice(0, 2) == BUFFER_HEADER
            size = chunk.slice(2, 4).unpack1('N')
            if size
              return Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(chunk.slice(6, size)).read rescue nil
            end
          end

          nil
        end
      end
    end
  end
end