fluent/fluentd

View on GitHub
lib/fluent/plugin/sd_file.rb

Summary

Maintainability
A
0 mins
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'cool.io'

require 'fluent/plugin_helper'
require 'fluent/plugin/service_discovery'

module Fluent
  module Plugin
    class FileServiceDiscovery < ServiceDiscovery
      include PluginHelper::Mixin

      Plugin.register_sd('file', self)

      DEFAULT_FILE_TYPE = :yaml
      DEFAUT_WEIGHT = 60
      DEFAULT_SD_FILE_PATH = ENV['DEFAULT_SD_FILE_PATH'] || '/etc/fluent/sd.yaml'

      helpers :event_loop

      config_param :path, :string, default: DEFAULT_SD_FILE_PATH
      config_param :conf_encoding, :string, default: 'utf-8'

      def initialize
        super

        @file_type = nil
      end

      def configure(conf)
        super

        unless File.exist?(@path)
          raise Fluent::ConfigError, "sd_file: path=#{@path} not found"
        end

        @file_type = File.basename(@path).split('.', 2).last.to_sym
        unless %i[yaml yml json].include?(@file_type)
          @file_type = DEFAULT_FILE_TYPE
        end

        @services = fetch_server_info
      end

      def start(queue)
        watcher = StatWatcher.new(@path, @log) do |_prev, _cur|
          refresh_file(queue)
        end
        event_loop_attach(watcher)

        super()
      end

      private

      def parser
        @parser ||=
          case @file_type
          when :yaml, :yml
            require 'yaml'
            -> (v) { YAML.safe_load(v).map }
          when :json
            require 'json'
            -> (v) { JSON.parse(v) }
          end
      end

      def refresh_file(queue)
        s =
          begin
            fetch_server_info
          rescue => e
            @log.error("sd_file: #{e}")
            return
          end

        if s.nil?
          # if any error occurs, skip this turn
          return
        end

        diff = []
        join = s - @services
        # Need service_in first to guarantee that server exist at least one all time.
        join.each do |j|
          diff << ServiceDiscovery.service_in_msg(j)
        end

        drain = @services - s
        drain.each do |d|
          diff << ServiceDiscovery.service_out_msg(d)
        end

        @services = s

        diff.each do |a|
          queue.push(a)
        end
      end

      def fetch_server_info
        config_data =
          begin
            File.open(@path, "r:#{@conf_encoding}:utf-8", &:read)
          rescue => e
            raise Fluent::ConfigError, "sd_file: path=#{@path} couldn't open #{e}"
          end

        parser.call(config_data).map do |s|
          Service.new(
            :file,
            s.fetch('host'),
            s.fetch('port'),
            s['name'],
            s.fetch('weight', DEFAUT_WEIGHT),
            s['standby'],
            s['username'],
            s['password'],
            s['shared_key'],
          )
        end
      rescue KeyError => e
        raise Fluent::ConfigError, "#{e}. Service must have `host` and `port`"
      end

      class StatWatcher < Coolio::StatWatcher
        def initialize(path, log, &callback)
          @path = path
          @log = log
          @callback = callback
          super(@path)
        end

        def on_change(prev_stat, cur_stat)
          @callback.call(prev_stat, cur_stat)
        rescue => e
          @log.error(e)
        end
      end
    end
  end
end