lunardial/fluent-plugin-parser_cef

View on GitHub
lib/fluent/plugin/parser_cef.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# -*- coding: utf-8

require 'fluent/log'
require 'fluent/plugin/parser'
require 'time'
require 'yaml'

module Fluent
  module Plugin
    class CommonEventFormatParser < Parser
      Fluent::Plugin.register_parser("cef", self)
      config_param :log_format, :string, :default => "syslog"
      config_param :log_utc_offset, :string, :default => nil
      config_param :syslog_timestamp_format, :string, :default => '\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2}'
      config_param :cef_version, :integer, :default => 0
      config_param :parse_strict_mode, :bool, :default => true
      config_param :cef_keyfilename, :string, :default => 'config/cef_version_0_keys.yaml'
      config_param :output_raw_field, :bool, :default => false

      def configure(conf)
        super
        @key_value_format_regexp = /([^\s=]+)=(.*?)(?:(?=[^\s=]+=)|\z)/
        @valid_format_regexp = create_valid_format_regexp
        @utc_offset = get_utc_offset(@log_utc_offset)
        begin
          if @parse_strict_mode
            if @cef_keyfilename =~ /^\//
              yaml_fieldinfo = YAML.load_file(@cef_keyfilename)
            else
              yaml_fieldinfo = YAML.load_file("#{File.dirname(File.expand_path(__FILE__))}/#{@cef_keyfilename}")
            end
            @keys_array = []
            yaml_fieldinfo.each {|_key, value| @keys_array.concat(value) }
            $log.info "running with strict mode, #{@keys_array.length} keys are valid."
          else
            $log.info "running without strict mode"
          end
        rescue => e
          @parse_strict_mode = false
          $log.warn "running without strict mode because of the following error"
          $log.warn "#{e.message}"
        end
      end

      def parse(text)
        if text.nil? || text.empty?
          yield nil, nil
          return
        end
        text.force_encoding("utf-8")
        replaced_text = text.scrub('?')
        record = {}
        record_overview = @valid_format_regexp.match(replaced_text)
        if record_overview.nil?
          yield Engine.now, { "raw" => replaced_text }
          return
        end
        time = get_unixtime_with_utc_offset(record_overview["syslog_timestamp"], @utc_offset)
        begin
          record_overview.names.each {|key| record[key] = record_overview[key] }
          text_cef_extension = record_overview["cef_extension"]
          record.delete("cef_extension")
        rescue
          yield Engine.now, { "raw" => replaced_text }
          return
        end
        unless text_cef_extension.nil?
          record_cef_extension = parse_cef_extension(text_cef_extension)
          record.merge!(record_cef_extension)
        end
        record["raw"] = replaced_text if @output_raw_field
        yield time, record
        return
      end

      private

      def get_utc_offset(text)
        utc_offset = nil
        begin
          utc_offset = Time.new.localtime(text).strftime("%:z")
          $log.info "utc_offset: #{utc_offset}"
        rescue => e
          utc_offset = Time.new.localtime.strftime("%:z")
          $log.info "#{e.message}, use localtime"
          $log.info "utc_offset: #{utc_offset}"
        end
        return utc_offset
      end

      def create_valid_format_regexp()
        case @log_format
        when "syslog"
          syslog_header = /
              (?<syslog_timestamp>#{@syslog_timestamp_format})\s
              (?<syslog_hostname>\S+)\s
              (?<syslog_tag>\S*)\s*
            /x
          cef_header = /
            CEF:(?<cef_version>#{@cef_version})\|
            (?<cef_device_vendor>[^|]*)\|
            (?<cef_device_product>[^|]*)\|
            (?<cef_device_version>[^|]*)\|
            (?<cef_device_event_class_id>[^|]*)\|
            (?<cef_name>[^|]*)\|
            (?<cef_severity>[^|]*)
          /x
          valid_format_regexp = /
              \A
                #{syslog_header}
                (?:\u{feff})?
                #{cef_header}\|
                (?<cef_extension>.*)
              \z
            /x
        else
          raise Fluent::ConfigError, "#{@log_format} is unknown format"
        end
        return Regexp.new(valid_format_regexp)
      end

      def get_unixtime_with_utc_offset(timestamp, utc_offset)
        unixtime = nil
        begin
          if timestamp =~ /[-+]\d{2}:?\d{2}\z/
            unixtime = Time.parse(timestamp).to_i
          else
            unixtime = Time.parse("#{timestamp} #{utc_offset}").to_i
          end
        rescue
          unixtime = Engine.now
        end
        return unixtime
      end

      def parse_cef_extension(text)
        if @parse_strict_mode == true
          return parse_cef_extension_with_strict_mode(text)
        else
          return parse_cef_extension_without_strict_mode(text)
        end
      end

      def parse_cef_extension_with_strict_mode(text)
        record = {}
        begin
          last_valid_key_name = nil
          text.scan(@key_value_format_regexp) do |key, value|
            if @keys_array.include?(key)
              record[key] = value
              record[last_valid_key_name].rstrip! unless last_valid_key_name.nil?
              last_valid_key_name = key
            else
              record[last_valid_key_name].concat("#{key}=#{value}")
            end
          end
        rescue
          return {}
        end
        return record
      end

      def parse_cef_extension_without_strict_mode(text)
        record = {}
        begin
          text.scan(@key_value_format_regexp) {|key, value| record[key] = value.rstrip }
        rescue
          return {}
        end
        return record
      end
    end
  end
end