takus/fluent-plugin-ec2-metadata

View on GitHub
lib/fluent/plugin/ec2_metadata.rb

Summary

Maintainability
A
2 hrs
Test Coverage
module Fluent
  module EC2Metadata

    def initialize
      super
      require 'net/http'
      require 'aws-sdk-ec2'
      require 'oj'
    end

    def configure(conf)
      super

      # <record></record> directive
      @map = {}
      conf.elements.select { |element| element.name == 'record' }.each { |element|
        element.each_pair { |k, v|
          element.has_key?(k) # to suppress unread configuration warning
          @map[k] = v
        }
      }

      @placeholder_expander = PlaceholderExpander.new(log)

      # get metadata first and then setup a refresh thread
      @ec2_metadata = get_metadata_and_tags
      @refresh_thread = Thread.new {
        while true
          sleep @metadata_refresh_seconds
          @ec2_metadata = get_metadata_and_tags
        end
      }
    end

    private

    def get_metadata_and_tags
      metadata = {}
      set_metadata(metadata)
      set_tag(metadata)
      metadata
    end

    def set_metadata(ec2_metadata)
      instance_identity = Oj.load(get_dynamic_data("instance-identity/document"))
      ec2_metadata['account_id'] = instance_identity["accountId"]
      ec2_metadata['image_id'] = instance_identity["imageId"]

      ec2_metadata['instance_id'] = get_metadata('instance-id')
      ec2_metadata['instance_type'] = get_metadata('instance-type')
      ec2_metadata['availability_zone'] = get_metadata('placement/availability-zone')
      ec2_metadata['region'] = ec2_metadata['availability_zone'].chop
      ec2_metadata['private_ip'] = get_metadata('local-ipv4')
      ec2_metadata['mac'] = get_metadata('mac')
      begin
        ec2_metadata['vpc_id'] = get_metadata("network/interfaces/macs/#{ec2_metadata['mac']}/vpc-id")
      rescue
        ec2_metadata['vpc_id'] = nil
        log.info "ec2-metadata: 'vpc_id' is undefined #{ec2_metadata['instance_id']} is not in VPC}"
      end
      begin
        ec2_metadata['subnet_id'] = get_metadata("network/interfaces/macs/#{ec2_metadata['mac']}/subnet-id")
      rescue
        ec2_metadata['subnet_id'] = nil
        log.info "ec2-metadata: 'subnet_id' is undefined because #{ec2_metadata['instance_id']} is not in VPC}"
      end
      ec2_metadata
    end

    def get_dynamic_data(f)
      Net::HTTP.start('169.254.169.254') do |http|
        res = http.get("/latest/dynamic/#{f}", get_header())
        raise Fluent::ConfigError, "ec2-dynamic-data: failed to get #{f}" unless res.is_a?(Net::HTTPSuccess)
        res.body
      end
    end

    def get_metadata(f)
      Net::HTTP.start('169.254.169.254') do |http|
        res = http.get("/latest/meta-data/#{f}", get_header())
        raise Fluent::ConfigError, "ec2-metadata: failed to get #{f}" unless res.is_a?(Net::HTTPSuccess)
        res.body
      end
    end

    def get_header()
      if @imdsv2
        Net::HTTP.start('169.254.169.254') do |http|
          res = http.put("/latest/api/token", '', { 'X-aws-ec2-metadata-token-ttl-seconds' => '300' })
          raise Fluent::ConfigError, "ec2-metadata: failed to get token" unless res.is_a?(Net::HTTPSuccess)
          { 'X-aws-ec2-metadata-token' => res.body }
        end
      else
        {}
      end
    end

    def set_tag(ec2_metadata)
      if @map.values.any? { |v| v.match(/^\${tagset_/) } || @output_tag =~ /\${tagset_/

        if @aws_key_id and @aws_sec_key
          ec2 = Aws::EC2::Client.new(
            region: ec2_metadata['region'],
            access_key_id: @aws_key_id,
            secret_access_key: @aws_sec_key,
          )
        else
          ec2 = Aws::EC2::Client.new(
            region: ec2_metadata['region'],
          )
        end

        response = ec2.describe_instances({ :instance_ids => [ec2_metadata['instance_id']] })
        instance = response.reservations[0].instances[0]
        raise Fluent::ConfigError, "ec2-metadata: failed to get instance data #{response.pretty_inspect}" if instance.nil?

        instance.tags.each { |tag|
          ec2_metadata["tagset_#{tag.key.downcase}"] = tag.value
        }
      end
    end

    def modify_record(record, tag, tag_parts)
      placeholders = @placeholder_expander.prepare_placeholders(record, tag, tag_parts, @ec2_metadata)
      new_record = record.dup
      @map.each_pair { |k, v| new_record[k] = @placeholder_expander.expand(v, placeholders) }
      new_record
    end

    def modify(output_tag, record, tag, tag_parts)
      placeholders = @placeholder_expander.prepare_placeholders(record, tag, tag_parts, @ec2_metadata)

      new_tag = @placeholder_expander.expand(output_tag, placeholders)

      new_record = record.dup
      @map.each_pair { |k, v| new_record[k] = @placeholder_expander.expand(v, placeholders) }

      [new_tag, new_record]
    end

    class PlaceholderExpander
      def initialize(log)
        @log = log
      end

      # referenced https://github.com/fluent/fluent-plugin-rewrite-tag-filter
      # referenced https://github.com/sonots/fluent-plugin-record-reformer
      attr_reader :placeholders

      def prepare_placeholders(_record, tag, tag_parts, ec2_metadata)
        placeholders = {
          '${tag}' => tag,
        }

        size = tag_parts.size
        tag_parts.each_with_index { |t, idx|
          placeholders.store("${tag_parts[#{idx}]}", t)
          placeholders.store("${tag_parts[#{idx-size}]}", t) # support tag_parts[-1]
        }

        ec2_metadata.each { |k, v|
          placeholders.store("${#{k}}", v)
        }

        placeholders
      end

      def expand(str, placeholders)
        str.gsub(/(\${[a-z_:\-]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) {
          @log.warn "ec2-metadata: unknown placeholder `#{$1}` found in a tag `#{placeholders['${tag}']}`" unless placeholders.include?($1)
          placeholders[$1]
        }
      end
    end
  end
end