fluent/fluentd

View on GitHub
lib/fluent/time.rb

Summary

Maintainability
D
2 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'time'
require 'msgpack'
require 'strptime'
require 'fluent/timezone'
require 'fluent/configurable'
require 'fluent/config/error'

module Fluent
  class EventTime
    TYPE = 0
    FORMATTER = Strftime.new('%Y-%m-%d %H:%M:%S.%N %z')

    def initialize(sec, nsec = 0)
      @sec = sec
      @nsec = nsec
    end

    def ==(other)
      if other.is_a?(Fluent::EventTime)
        @sec == other.sec
      else
        @sec == other
      end
    end

    def sec
      @sec
    end

    def nsec
      @nsec
    end

    def to_int
      @sec
    end
    alias :to_i :to_int

    def to_f
      @sec + @nsec / 1_000_000_000.0
    end

    # for Time.at
    def to_r
      Rational(@sec * 1_000_000_000 + @nsec, 1_000_000_000)
    end

    # for > and others
    def coerce(other)
      [other, @sec]
    end

    def to_s
      @sec.to_s
    end

    begin
      # ruby 2.5 or later
      Time.at(0, 0, :nanosecond)

      def to_time
        Time.at(@sec, @nsec, :nanosecond)
      end
    rescue
      def to_time
        Time.at(Rational(@sec * 1_000_000_000 + @nsec, 1_000_000_000))
      end
    end

    def to_json(*args)
      @sec.to_s
    end

    def to_msgpack(io = nil)
      @sec.to_msgpack(io)
    end

    def to_msgpack_ext
      [@sec, @nsec].pack('NN')
    end

    def self.from_msgpack_ext(data)
      new(*data.unpack('NN'))
    end

    def self.from_time(time)
      Fluent::EventTime.new(time.to_i, time.nsec)
    end

    def self.eq?(a, b)
      if a.is_a?(Fluent::EventTime) && b.is_a?(Fluent::EventTime)
        a.sec == b.sec && a.nsec == b.nsec
      else
        a == b
      end
    end

    def self.now
      # This method is called many time. so call Process.clock_gettime directly instead of Fluent::Clock.real_now
      now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
      Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
    end

    def self.parse(*args)
      from_time(Time.parse(*args))
    end

    ## TODO: For performance, implement +, -, and so on
    def method_missing(name, *args, &block)
      @sec.send(name, *args, &block)
    end

    def inspect
      FORMATTER.exec(Time.at(self))
    end
  end

  module TimeMixin
    TIME_TYPES = ['string', 'unixtime', 'float', 'mixed']

    TIME_PARAMETERS = [
      [:time_format, :string, {default: nil}],
      [:localtime, :bool, {default: true}],  # UTC if :localtime is false and :timezone is nil
      [:utc,       :bool, {default: false}], # to turn :localtime false
      [:timezone, :string, {default: nil}],
      [:time_format_fallbacks, :array, {default: []}], # try time_format, then try fallbacks
    ]
    TIME_FULL_PARAMETERS = [
      # To avoid to define :time_type twice (in plugin_helper/inject)
      [:time_type, :enum, {default: :string, list: TIME_TYPES.map(&:to_sym)}],
    ] + TIME_PARAMETERS

    module TimeParameters
      include Fluent::Configurable
      TIME_FULL_PARAMETERS.each do |name, type, opts|
        config_param(name, type, **opts)
      end

      def configure(conf)
        if conf.has_key?('localtime') || conf.has_key?('utc')
          if conf.has_key?('localtime')
            conf['localtime'] = Fluent::Config.bool_value(conf['localtime'])
          elsif conf.has_key?('utc')
            conf['localtime'] = !(Fluent::Config.bool_value(conf['utc']))
            # Specifying "localtime false" means using UTC in TimeFormatter
            # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC).
            # There are difference between "Z" and "+0000" in timezone formatting.
            # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way
          end
        end

        super

        if conf.has_key?('localtime') && conf.has_key?('utc') && !(@localtime ^ @utc)
          raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them"
        end

        if conf.has_key?('time_type') and @time_type == :mixed
          if @time_format.nil? and @time_format_fallbacks.empty?
            raise Fluent::ConfigError, "time_type is :mixed but time_format and time_format_fallbacks is empty."
          end
        end

        Fluent::Timezone.validate!(@timezone) if @timezone
      end
    end

    module Parser
      def self.included(mod)
        mod.include TimeParameters
      end

      def time_parser_create(type: @time_type, format: @time_format, timezone: @timezone, force_localtime: false)
        return MixedTimeParser.new(type, format, @localtime, timezone, @utc, force_localtime, @time_format_fallbacks) if type == :mixed
        return NumericTimeParser.new(type) if type != :string
        return TimeParser.new(format, true, nil) if force_localtime

        localtime = @localtime && (timezone.nil? && !@utc)
        TimeParser.new(format, localtime, timezone)
      end
    end

    module Formatter
      def self.included(mod)
        mod.include TimeParameters
      end

      def time_formatter_create(type: @time_type, format: @time_format, timezone: @timezone, force_localtime: false)
        return NumericTimeFormatter.new(type) if type != :string
        return TimeFormatter.new(format, true, nil) if force_localtime

        localtime = @localtime && (timezone.nil? && !@utc)
        TimeFormatter.new(format, localtime, timezone)
      end
    end
  end

  class TimeParser
    class TimeParseError < StandardError; end

    def initialize(format = nil, localtime = true, timezone = nil)
      if format.nil? && (timezone || !localtime)
        raise Fluent::ConfigError, "specifying timezone requires time format"
      end

      @cache1_key = nil
      @cache1_time = nil
      @cache2_key = nil
      @cache2_time = nil

      format_with_timezone = format && (format.include?("%z") || format.include?("%Z"))

      utc_offset = case
                   when format_with_timezone then
                     nil
                   when timezone then
                     Fluent::Timezone.utc_offset(timezone)
                   when localtime then
                     nil
                   else
                     0 # utc
                   end

      strptime = format && (Strptime.new(format) rescue nil)

      @parse = case
               when format_with_timezone && strptime then ->(v){ Fluent::EventTime.from_time(strptime.exec(v)) }
               when format_with_timezone             then ->(v){ Fluent::EventTime.from_time(Time.strptime(v, format)) }
               when format == '%iso8601'             then ->(v){ Fluent::EventTime.from_time(Time.iso8601(v)) }
               when strptime then
                 if utc_offset.nil?
                   ->(v){ t = strptime.exec(v); Fluent::EventTime.new(t.to_i, t.nsec) }
                 elsif utc_offset.respond_to?(:call)
                   ->(v) { t = strptime.exec(v); Fluent::EventTime.new(t.to_i + t.utc_offset - utc_offset.call(t), t.nsec) }
                 else
                   ->(v) { t = strptime.exec(v); Fluent::EventTime.new(t.to_i + t.utc_offset - utc_offset, t.nsec) }
                 end
               when format then
                 if utc_offset.nil?
                   ->(v){ t = Time.strptime(v, format); Fluent::EventTime.new(t.to_i, t.nsec) }
                 elsif utc_offset.respond_to?(:call)
                   ->(v){ t = Time.strptime(v, format); Fluent::EventTime.new(t.to_i + t.utc_offset - utc_offset.call(t), t.nsec) }
                 else
                   ->(v){ t = Time.strptime(v, format); Fluent::EventTime.new(t.to_i + t.utc_offset - utc_offset, t.nsec) }
                 end
               else ->(v){ Fluent::EventTime.parse(v) }
               end
    end

    # TODO: new cache mechanism using format string
    def parse(value)
      unless value.is_a?(String)
        raise TimeParseError, "value must be string: #{value}"
      end

      if @cache1_key == value
        return @cache1_time
      elsif @cache2_key == value
        return @cache2_time
      else
        begin
          time = @parse.call(value)
        rescue => e
          raise TimeParseError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
        end
        @cache1_key = @cache2_key
        @cache1_time = @cache2_time
        @cache2_key = value
        @cache2_time = time
        return time
      end
    end
    alias :call :parse
  end

  class NumericTimeParser < TimeParser # to include TimeParseError
    def initialize(type, localtime = nil, timezone = nil)
      @cache1_key = @cache1_time = @cache2_key = @cache2_time = nil

      if type == :unixtime
        define_singleton_method(:parse, method(:parse_unixtime))
        define_singleton_method(:call, method(:parse_unixtime))
      else # :float
        define_singleton_method(:parse, method(:parse_float))
        define_singleton_method(:call, method(:parse_float))
      end
    end

    def parse_unixtime(value)
      unless value.is_a?(String) || value.is_a?(Numeric)
        raise TimeParseError, "value must be a string or a number: #{value}(#{value.class})"
      end

      if @cache1_key == value
        return @cache1_time
      elsif @cache2_key == value
        return @cache2_time
      end

      begin
        time = Fluent::EventTime.new(value.to_i)
      rescue => e
        raise TimeParseError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
      end
      @cache1_key = @cache2_key
      @cache1_time = @cache2_time
      @cache2_key = value
      @cache2_time = time
      time
    end

    # rough benchmark result to compare handmade parser vs Fluent::EventTime.from_time(Time.at(value.to_r))
    # full: with 9-digits of nsec after dot
    # msec: with 3-digits of msec after dot
    # 10_000_000 times loop on MacBookAir
    ## parse_by_myself(full): 12.162475 sec
    ## parse_by_myself(msec): 15.050435 sec
    ## parse_by_to_r  (full): 28.722362 sec
    ## parse_by_to_r  (msec): 28.232856 sec
    def parse_float(value)
      unless value.is_a?(String) || value.is_a?(Numeric)
        raise TimeParseError, "value must be a string or a number: #{value}(#{value.class})"
      end

      if @cache1_key == value
        return @cache1_time
      elsif @cache2_key == value
        return @cache2_time
      end

      begin
        sec_s, nsec_s, _ = value.to_s.split('.', 3) # throw away second-dot and later
        nsec_s = nsec_s && nsec_s[0..9] || '0'
        nsec_s += '0' * (9 - nsec_s.size) if nsec_s.size < 9
        time = Fluent::EventTime.new(sec_s.to_i, nsec_s.to_i)
      rescue => e
        raise TimeParseError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
      end
      @cache1_key = @cache2_key
      @cache1_time = @cache2_time
      @cache2_key = value
      @cache2_time = time
      time
    end
  end

  class TimeFormatter
    def initialize(format = nil, localtime = true, timezone = nil)
      @tc1 = 0
      @tc1_str = nil
      @tc2 = 0
      @tc2_str = nil

      strftime = format && (Strftime.new(format) rescue nil)
      if format && format =~ /(^|[^%])(%%)*%L|(^|[^%])(%%)*%\d*N/
        define_singleton_method(:format, method(:format_with_subsec))
        define_singleton_method(:call, method(:format_with_subsec))
      else
        define_singleton_method(:format, method(:format_without_subsec))
        define_singleton_method(:call, method(:format_without_subsec))
      end

      formatter = Fluent::Timezone.formatter(timezone, strftime ? strftime : format)
      @format_nocache = case
                        when formatter             then formatter
                        when strftime && localtime then ->(time){ strftime.exec(Time.at(time)) }
                        when format && localtime   then ->(time){ Time.at(time).strftime(format) }
                        when strftime              then ->(time){ strftime.exec(Time.at(time).utc) }
                        when format                then ->(time){ Time.at(time).utc.strftime(format) }
                        when localtime             then ->(time){ Time.at(time).iso8601 }
                        else                            ->(time){ Time.at(time).utc.iso8601 }
                        end
    end

    def format_without_subsec(time)
      if @tc1 == time
        return @tc1_str
      elsif @tc2 == time
        return @tc2_str
      else
        str = format_nocache(time)
        if @tc1 < @tc2
          @tc1 = time
          @tc1_str = str
        else
          @tc2 = time
          @tc2_str = str
        end
        return str
      end
    end

    def format_with_subsec(time)
      if Fluent::EventTime.eq?(@tc1, time)
        return @tc1_str
      elsif Fluent::EventTime.eq?(@tc2, time)
        return @tc2_str
      else
        str = format_nocache(time)
        if @tc1 < @tc2
          @tc1 = time
          @tc1_str = str
        else
          @tc2 = time
          @tc2_str = str
        end
        return str
      end
    end

    ## Dynamically defined in #initialize
    # def format(time)
    # end

    def format_nocache(time)
      @format_nocache.call(time)
    end
  end

  class NumericTimeFormatter < TimeFormatter
    def initialize(type, localtime = nil, timezone = nil)
      @cache1_key = @cache1_time = @cache2_key = @cache2_time = nil

      if type == :unixtime
        define_singleton_method(:format, method(:format_unixtime))
        define_singleton_method(:call, method(:format_unixtime))
      else # :float
        define_singleton_method(:format, method(:format_float))
        define_singleton_method(:call, method(:format_float))
      end
    end

    def format_unixtime(time)
      time.to_i.to_s
    end

    def format_float(time)
      if time.is_a?(Fluent::EventTime) || time.is_a?(Time)
        # 10.015 secs for 10_000_000 times call on MacBookAir
        nsec_s = time.nsec.to_s
        nsec_s = '0' * (9 - nsec_s.size) if nsec_s.size < 9
        "#{time.sec}.#{nsec_s}"
      else # integer (or float?)
        time.to_f.to_s
      end
    end
  end

  # MixedTimeParser is available when time_type is set to :mixed
  #
  # Use Case 1: primary format is specified explicitly in time_format
  #  time_type mixed
  #  time_format %iso8601
  #  time_format_fallbacks unixtime
  # Use Case 2: time_format is omitted
  #  time_type mixed
  #  time_format_fallbacks %iso8601, unixtime
  #
  class MixedTimeParser < TimeParser # to include TimeParseError
    def initialize(type, format = nil, localtime = nil, timezone = nil, utc = nil, force_localtime = nil, fallbacks = [])
      @parsers = []
      fallbacks.unshift(format).each do |fallback|
        next unless fallback
        case fallback
        when 'unixtime', 'float'
          @parsers << NumericTimeParser.new(fallback, localtime, timezone)
        else
          if force_localtime
            @parsers << TimeParser.new(fallback, true, nil)
          else
            localtime = localtime && (timezone.nil? && !utc)
            @parsers << TimeParser.new(fallback, localtime, timezone)
          end
        end
      end
    end

    def parse(value)
      @parsers.each do |parser|
        begin
          Float(value) if parser.class == Fluent::NumericTimeParser
        rescue
          next
        end
        begin
          return parser.parse(value)
        rescue
          # skip TimeParseError
        end
      end
      fallback_class = @parsers.collect do |parser| parser.class end.join(",")
      raise TimeParseError, "invalid time format: value = #{value}, even though fallbacks: #{fallback_class}"
    end
  end

end