crowbar/crowbar-openstack

View on GitHub
chef/cookbooks/neutron/files/default/neutron-l3-ha-service.rb

Summary

Maintainability
A
3 hrs
Test Coverage
#!/usr/bin/env ruby
#
# Copyright 2017, SUSE Linux GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'yaml'
require 'open3'
require 'logger'

class HAToolLog
  def self.log
    if @logger.nil?
      @logger = Logger.new(STDERR)
      @logger.level = Logger::DEBUG
    end
    @logger
  end

  def self.file=(file_path)
    @logger = Logger.new(file_path, "daily") unless file_path.nil?
  end
end

def main
  terminator = Terminator.new
  terminator.register_term_signal_handler
  service_options = ServiceOptions.load ARGV[0]
  ha_functions = HAFunctions.new(service_options)
  error_counter = ErrorCounter.new service_options.max_errors_tolerated
  HAToolLog.file = service_options.log_file

  while true
    status = ha_functions.check_l3_agents
    if status.exit_status == 2
      error_counter.reset!

      register_errors_to(
        error_counter,
        terminator.dont_exit_until_block_finished do
          ha_functions.migrate_routers_away_from_dead_agents
        end
      )
    elsif status.exit_status == 0
      error_counter.reset!
    else
      error_counter.bump!
    end

    HAToolLog.log.info("Sleeping until next run")
    sleep service_options.seconds_to_sleep_between_checks
  end
end


def register_errors_to error_counter, result
  if result.exit_status == 0
    error_counter.reset!
  else
    error_counter.bump!
  end
end


class Terminator
  def initialize
    @immediate_exit_enabled = true
    @exit_requested = false
  end

  def register_term_signal_handler
    Signal.trap "TERM" do
      exit 0 if @immediate_exit_enabled
      @exit_requested = true
    end
  end

  def dont_exit_until_block_finished
    @immediate_exit_enabled = false
    result = yield
    @immediate_exit_enabled = true
    if @exit_requested
      HAToolLog.log.info("TERM signal received while waiting for block to finish, exiting now")
      exit 0
    end
    result
  end
end


class HAFunctions
  def initialize(service_options)
    @service_options = service_options
    @hatool = HATool.new service_options.hatool
    # TODO(mlakat): Check if we can skip this step
    # maybe we should just assume that the tool supports --retry?
  end

  def check_l3_agents
    HAToolLog.log.info("checking for dead agents")
    run_supervised(
      @hatool.status_command,
      @service_options.status_timeout
    )
  end

  def migrate_routers_away_from_dead_agents
    HAToolLog.log.info("migrating routers away from dead agents")
    run_supervised(
      @hatool.migration_command,
      @service_options.router_migration_timeout
    )
  end

  def run_supervised(command, timeout)
    subprocess = Subprocess.new *command
    subprocess.env.merge! @service_options.hatool.env
    supervisor = Supervisor.new(subprocess, timeout)
    supervisor.run_subprocess
  end
end


class TimedOut < StandardError
end


class MaximumErrorsReached < StandardError
end


class AlreadyCompleted < StandardError
end


class RunningHelpFailed < StandardError
end


class LoggingStreamReader
  def initialize
    @lines = []
    @thread = nil
  end

  def read stream, log_msg
    @thread = Thread.new do
      stream.each do |line|
        @lines << line
        HAToolLog.log.info("#{log_msg} #{line}")
      end
    end
  end

  def content
    @lines.join("\n")
  end

  def join
    # Leave 1 second for the thread to join, otherwise raise an exception
    result = @thread.join 1
    if result.nil?
      raise StandardError
    end
  end
end


class Subprocess
  attr_reader :env

  def initialize *args
    @args = args
    @completed = false
    @env = {}
    @stdout_reader = LoggingStreamReader.new
    @stderr_reader = LoggingStreamReader.new
  end

  def environment
    Hash[@env.map { |key, val| [key, val.to_s] }]
  end

  def start
    stdin, stdout, stderr, @wait_thr = Open3.popen3(environment, *@args)
    @pid = @wait_thr.pid
    @stdout_reader.read stdout, "#{self} stdout:"
    @stderr_reader.read stderr, "#{self} stderr:"
    stdin.close
  end

  def wait timeout
    if @completed
      raise AlreadyCompleted
    end
    wait_result = @wait_thr.join(timeout)
    if wait_result.nil?
      raise TimedOut
    else
      @completed = true
      @stdout_reader.join
      @stderr_reader.join
      output = @stdout_reader.content
      error = @stderr_reader.content
      exitstatus = @wait_thr.value.exitstatus
      RunResult.new output, error, exitstatus
    end
  end

  def send_signal signal
    begin
      Process.kill(signal, @pid)
    rescue Errno::ESRCH
      # Process already killed
    end
  end

  def to_s
    status = @pid.nil? ? "not-started-yet" : @pid.to_s
    status = "already-exited" if @completed
    "Subprocess(#{@args.join " "})[#{status}]"
  end
end


class ServiceOptions
  attr_reader :status_timeout
  attr_reader :router_migration_timeout
  attr_reader :hatool
  attr_reader :seconds_to_sleep_between_checks
  attr_reader :max_errors_tolerated
  attr_reader :log_file

  def initialize(params = {})
    @status_timeout = params.fetch(:status_timeout)
    @router_migration_timeout = params.fetch(:router_migration_timeout)
    @hatool = params.fetch(:hatool_options)
    @seconds_to_sleep_between_checks = params.fetch(:sleep_time)
    @max_errors_tolerated = params.fetch(:max_errors_tolerated)
    @log_file = params.fetch(:log_file)
  end

  def self.load(path)
    File.open path do |file|
      data = YAML.load file.read
      ServiceOptions.new(
        status_timeout: TimeoutOptions.from_hash(data["timeouts"]["status"]),
        router_migration_timeout: TimeoutOptions.from_hash(data["timeouts"]["router_migration"]),
        hatool_options: HAToolOptions.from_hash(data["hatool"]),
        sleep_time: data["seconds_to_sleep_between_checks"].to_i,
        max_errors_tolerated: data["max_errors_tolerated"].to_i,
        log_file: data["log_file"]
      )
    end
  end
end


class ErrorCounter
  attr_reader :errors

  def initialize max_errors_tolerated
    @max_errors_tolerated = max_errors_tolerated
    @errors = 0
  end

  def reset!
    HAToolLog.log.info("error counter: re-set to 0")
    @errors = 0
  end

  def bump!
    @errors += 1
    HAToolLog.log.info("error counter: bumped to #{@errors}")
    if @errors > @max_errors_tolerated
      HAToolLog.log.error("error counter: exceeded limit #{@max_errors_tolerated}")
      raise MaximumErrorsReached
    end
  end
end


class Supervisor
  def initialize subprocess, timeout_options
    @subprocess = subprocess
    @timeout_options = timeout_options
  end

  def run_subprocess
    HAToolLog.log.info("supervisor: starting #{@subprocess}")
    @subprocess.start
    HAToolLog.log.info("supervisor: monitoring #{@subprocess}")

    result = begin
      @subprocess.wait @timeout_options.terminate
    rescue TimedOut
      HAToolLog.log.info("supervisor: #{@subprocess} did not terminate, sending TERM signal")
      @subprocess.send_signal "TERM"
      begin
        @subprocess.wait @timeout_options.kill
      rescue TimedOut
        HAToolLog.log.info("supervisor: #{@subprocess} did not terminate, sending KILL signal")
        @subprocess.send_signal "KILL"
        @subprocess.wait 1
      end
    end

    HAToolLog.log.info("supervisor: done running #{@subprocess} exited with: #{result.exit_status}")
    result
  end
end

class HATool
  attr_reader :extra_flags

  def initialize(options)
    @extra_flags = ["--retry"]
    @options = options
  end

  def insecure_flag
    @options.insecure ? ["--insecure"] : []
  end

  def status_command
    [@options.program, "--l3-agent-check", "--quiet"] + insecure_flag
  end

  def migration_command
    [@options.program, "--l3-agent-migrate", "--now"] + @extra_flags + insecure_flag
  end
end

class RunResult
  attr_reader :output
  attr_reader :error
  attr_reader :exit_status

  def initialize(output, error, exit_status)
    @output = output
    @error = error
    @exit_status = exit_status
  end
end

class TimeoutOptions
  attr_reader :terminate
  attr_reader :kill

  def initialize(terminate_timeout, kill_timeout)
    @terminate = terminate_timeout.to_i
    @kill = kill_timeout.to_i
  end

  def self.from_hash(hash)
    TimeoutOptions.new hash["terminate"], hash["kill"]
  end
end

class HAToolOptions
  attr_reader :program
  attr_reader :env
  attr_reader :insecure

  def initialize(program, env, insecure)
    @program = program
    @env = env
    @insecure = insecure
  end

  def self.to_bool(value)
    value.to_s == "true"
  end

  def self.from_hash(hash)
    HAToolOptions.new hash["program"], hash["env"], to_bool(hash["insecure"])
  end
end

main if __FILE__ == $PROGRAM_NAME