fluent/fluentd-ui

View on GitHub
app/models/fluentd/agent/common.rb

Summary

Maintainability
B
4 hrs
Test Coverage
# pidfile
#   td-agent: /var/run/td-agent/td-agent.pid
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/systemd/td-agent.service.erb#L18
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/deb/td-agent#L24
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/rpm/td-agent#L24
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/td-agent/logrotate.d/td-agent.logrotate#L10
#   fluentd:  nothing (or --daemon PIDFILE)
#
# logfile
#   td-agent: /var/log/td-agent/td-agent.log
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/systemd/td-agent.service.erb#L21
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/deb/td-agent#L23
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/rpm/td-agent#L23
#   fluentd: stdout (or --log LOGFILE)
#
# config file
#   td-agent: /etc/td-agent/td-agent.conf
#   - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/systemd/td-agent.service.erb#L14
#   fluentd: /etc/fluent/fluent.conf (created by fluentd -s)

require "strscan"

class Fluentd
  class Agent
    module Common
      attr_reader :extra_options

      def self.included(base)
        base.include(Fluentd::Agent::ProcessOperation)
      end

      # define these methods on each Agent class
      %w(start stop restart version).each do |method|
        define_method(method) do
          raise NotImplementedError, "'#{method}' method is required to be defined"
        end
      end

      def initialize(options = {})
        @extra_options = options
      end

      def pid_file
        extra_options[:pid_file] || self.class.default_options[:pid_file]
      end

      def log_file
        extra_options[:log_file] || self.class.default_options[:log_file]
      end

      def log
        @log ||= FluentdLog.new(log_file)
      end

      def config_file
        extra_options[:config_file] || self.class.default_options[:config_file]
      end

      # -- config
      def config
        File.read(config_file)
      end

      def config_write(content)
        backup_config
        File.open(config_file, "w") do |f|
          f.write content
        end
      end

      def config_append(content)
        backup_config
        File.open(config_file, "a") do |f|
          f.write "\n"
          f.write content
        end
      end

      def config_merge(content)
        if content.start_with?("<label ")
          label = content.slice(/<label\s+(.+?)>/, 1)
          key = "label:#{label}"
          parsed_config = parse_config(config)
          if parsed_config.key?(key)
            offset = parsed_config[key][0][:pos] + parsed_config[key][0][:size]
            label, sections = parse_label_section(content, offset)
            parsed_config[key][0][:sections]["filter"].concat(sections["filter"])
            parsed_config[key][0][:sections]["match"].concat(sections["match"])
            config_write(dump_parsed_config(parsed_config))
          else
            config_append(content)
          end
        else
          config_append(content)
        end
      end

      def configuration
        if File.exists? config_file
          ::Fluentd::Agent::Configuration.new(config_file)
        end
      end

      # -- backup methods
      def config_backup_dir
        dir = File.join(FluentdUI.data_dir, "#{Rails.env}_confg_backups")
        FileUtils.mkdir_p(dir)
        dir
      end

      def backup_files
        Dir.glob(File.join("#{config_backup_dir}", "*.conf"))
      end

      def backup_files_in_old_order
        backup_files.sort
      end

      def backup_files_in_new_order
        backup_files_in_old_order.reverse
      end

      def running_config_backup_dir
        dir = File.join(FluentdUI.data_dir, "#{Rails.env}_running_confg_backup")
        FileUtils.mkdir_p(dir)
        dir
      end

      def running_config_backup_file
        File.join(running_config_backup_dir, "running.conf")
      end

      # -------------- private --------------
      private

      def backup_running_config
        #back up config file only when start success
        return unless yield

        return unless File.exists? config_file

        FileUtils.cp config_file, running_config_backup_file

        true
      end

      def backup_config
        return unless File.exists? config_file

        FileUtils.cp config_file, File.join(config_backup_dir, "#{Time.zone.now.strftime('%Y%m%d_%H%M%S')}.conf")

        remove_over_backup_files
      end

      def remove_over_backup_files
        over_file_count = backup_files.size - ::Settings.max_backup_files_count

        return if over_file_count <= 0

        backup_files_in_old_order.first(over_file_count).each do |file|
          note_file_attached_backup = file.sub(/#{Regexp.escape(File.extname(file))}\z/, ::Fluentd::SettingArchive::Note::FILE_EXTENSION)
          FileUtils.rm(note_file_attached_backup) if File.exist? note_file_attached_backup
          FileUtils.rm(file) if File.exist? file
        end
      end

      def parse_config(content)
        scanner = StringScanner.new(content)
        contents = Hash.new {|h, k| h[k] = [] }
        until scanner.eos? do
          started = scanner.pos
          header = scanner.scan_until(/^<(source|filter|match|label)/)
          section_type = scanner[1]
          break unless header
          case section_type
          when "source", "filter", "match"
            current_source = header + scanner.scan_until(%r{^</(?:source|filter|match)>})
            contents[section_type] << { pos: started, content: current_source.strip }
          when "label"
            label_content = header + scanner.scan_until(%r{^</label>})
            label, sections = parse_label_section(label_content, started)
            contents["label:#{label}"] << { label: label, pos: started, sections: sections, size: label_content.size }
          else
            raise TypeError, "Unknown section: #{started}: #{section_type}"
          end
        end
        contents
      end

      def parse_label_section(content, offset)
        scanner = StringScanner.new(content)
        scanner.scan_until(/^<label\s+?([^\s]+?)>/)
        label = scanner[1]
        sections = Hash.new {|h, k| h[k] = [] }
        loop do
          break if scanner.match?(%r{\s+?</label>})
          pos = scanner.pos
          header = scanner.scan_until(/^\s*<(filter|match)/)
          type = scanner[1]
          source = header + scanner.scan_until(%r{^\s*</(?:filter|match)>})
          sections[type] << { label: label, pos: pos + offset, content: source.sub(/\n+/, "") }
        end
        return label, sections
      end

      def dump_parsed_config(parsed_config)
        content = "".dup
        sources = parsed_config["source"] || []
        filters = parsed_config["filter"] || []
        matches = parsed_config["match"] || []
        labels = parsed_config.select do |key, sections|
          key.start_with?("label:")
        end
        labels = labels.values.flatten
        sorted_sections = (sources + filters + matches + labels).sort_by do |section|
          section[:pos]
        end
        sorted_sections.each do |section|
          if section.key?(:label)
            label = section[:label]
            sub_filters = section.dig(:sections, "filter") || []
            sub_matches = section.dig(:sections, "match") || []
            sorted_sub_filters = sub_filters.sort_by do |sub_section|
              sub_section[:pos]
            end
            sorted_sub_matches =  sub_matches.sort_by do |sub_section|
              sub_section[:pos]
            end
            sub_sections = sorted_sub_filters + sorted_sub_matches
            content << "<label #{label}>\n"
            sub_sections.each do |sub_section|
              content << sub_section[:content] << "\n\n"
            end
            content.chomp!
            content << "</label>\n\n"
          else
            content << section[:content] << "\n\n"
          end
        end
        content.chomp
      end
    end
  end
end