BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/microservices/timeline_microservice.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'cosmos/utilities/authentication'
require 'cosmos/microservices/microservice'
require 'cosmos/models/activity_model'
require 'cosmos/models/notification_model'
require 'cosmos/models/timeline_model'
require 'cosmos/topics/timeline_topic'

require 'cosmos/script'

module Cosmos
  # The Timeline worker is a very simple thread pool worker. Once
  # the timeline manager has pushed a job to the schedule one of
  # these workers will run the CMD (command) or SCRIPT (script)
  # or anything that could be expanded in the future.
  class TimelineWorker
    def initialize(name:, scope:, queue:)
      @timeline_name = name
      @scope = scope
      @queue = queue
      @authentication = generate_auth()
    end

    # generate the auth object
    def generate_auth
      if ENV['COSMOS_API_USER'].nil? || ENV['COSMOS_API_CLIENT'].nil?
        return CosmosAuthentication.new()
      else
        return CosmosKeycloakAuthentication.new(ENV['COSMOS_KEYCLOAK_URL'])
      end
    end

    def run
      Logger.info "#{@timeline_name} timeline worker running"
      loop do
        activity = @queue.pop
        break if activity.nil?

        run_activity(activity)
      end
      Logger.info "#{@timeline_name} timeine worker exiting"
    end

    def run_activity(activity)
      case activity.kind.upcase
      when 'COMMAND'
        run_command(activity)
      when 'SCRIPT'
        run_script(activity)
      when 'EXPIRE'
        clear_expired(activity)
      else
        Logger.error "Unknown kind passed to microservice #{@timeline_name}: #{activity.as_json}"
      end
    end

    def run_command(activity)
      Logger.info "#{@timeline_name} run_command > #{activity.as_json}"
      begin
        cmd_no_hazardous_check(activity.data['command'], scope: @scope)
        activity.commit(status: 'completed', fulfillment: true)
      rescue StandardError => e
        activity.commit(status: 'failed', message: e.message)
        Logger.error "#{@timeline_name} run_cmd failed > #{activity.as_json}, #{e.message}"
      end
    end

    def run_script(activity)
      Logger.info "#{@timeline_name} run_script > #{activity.as_json}"
      begin
        request = Net::HTTP::Post.new(
          "/script-api/scripts/#{activity.data['script']}/run?scope=#{@scope}",
          'Content-Type' => 'application/json',
          'Authorization' => @authentication.token()
        )
        request.body = JSON.generate({
          'scope' => @scope,
          'environment' => activity.data['environment'],
          'timeline' => @timeline_name,
          'id' => activity.start
        })
        hostname = ENV['COSMOS_SCRIPT_HOSTNAME'] || 'cosmos-script-runner-api'
        response = Net::HTTP.new(hostname, 2902).request(request)
        raise "failed to call #{hostname}, for script: #{activity.data['script']}, response code: #{response.code}" if response.code != '200'

        activity.commit(status: 'completed', message: "#{activity.data['script']} => #{response.body}", fulfillment: true)
      rescue StandardError => e
        activity.commit(status: 'failed', message: e.message)
        Logger.error "#{@timeline_name} run_script failed > #{activity.as_json.to_s}, #{e.message}"
      end
    end

    def clear_expired(activity)
      begin
        ActivityModel.range_destroy(name: @timeline_name, scope: @scope, min: activity.start, max: activity.stop)
        activity.add_event(status: 'completed')
      rescue StandardError => e
        Logger.error "#{@timeline_name} clear_expired failed > #{activity.as_json} #{e.message}"
      end
    end
  end

  # Shared between the monitor thread and the manager thread to
  # share the planned activities. This should remain a thread
  # safe implamentation.
  class Schedule
    def initialize(name)
      @name = name
      @activities_mutex = Mutex.new
      @activities = []
      @size = 20
      @queue = Array.new(@size)
      @index = 0
    end

    def not_queued?(start)
      return false if @queue.index(start)

      @queue[@index] = start
      @index = @index + 1 >= @size ? 0 : @index + 1
      return true
    end

    def activities
      @activities_mutex.synchronize do
        return @activities.dup
      end
    end

    def update(input_activities)
      @activities_mutex.synchronize do
        @activities = input_activities.dup
      end
    end

    def add_activity(input_activity)
      @activities_mutex.synchronize do
        if @activities.find { |x| x.start == input_activity.start }.nil?
          @activities << input_activity
        end
      end
    end

    def remove_activity(input_activity)
      @activities_mutex.synchronize do
        @activities.delete_if { |h| h.start == input_activity.start }
      end
    end
  end

  # The timeline manager starts a thread pool and looks at the
  # schedule and if an "activity" should be run. TimelineManager
  # adds the "activity" to the thread pool and the thread will
  # execute the "activity".
  class TimelineManager
    def initialize(name:, scope:, schedule:)
      @timeline_name = name
      @scope = scope
      @schedule = schedule
      @worker_count = 3
      @queue = Queue.new
      @thread_pool = generate_thread_pool()
      @cancel_thread = false
      @expire = 0
    end

    def generate_thread_pool
      thread_pool = []
      @worker_count.times {
        worker = TimelineWorker.new(name: @timeline_name, scope: @scope, queue: @queue)
        thread_pool << Thread.new { worker.run }
      }
      return thread_pool
    end

    def run
      Logger.info "#{@timeline_name} timeline manager running"
      loop do
        start = Time.now.to_i
        @schedule.activities.each do |activity|
          start_difference = activity.start - start
          if start_difference <= 0 && @schedule.not_queued?(activity.start)
            Logger.debug "#{@timeline_name} #{@scope} current start: #{start}, vs #{activity.start}, #{start_difference}"
            activity.add_event(status: 'queued')
            @queue << activity
          end
        end
        if start >= @expire
          add_expire_activity()
          request_update(start: start)
        end
        break if @cancel_thread

        sleep(1)
        break if @cancel_thread
      end
      Logger.info "#{@timeline_name} timeine manager exiting"
    end

    # Add task to remove events older than 7 time
    def add_expire_activity
      now = Time.now.to_i
      @expire = now + 3_000
      activity = ActivityModel.new(
        name: @timeline_name,
        scope: @scope,
        start: (now - 86_400 * 7),
        stop: (now - 82_800 * 7),
        kind: 'EXPIRE',
        data: {}
      )
      @queue << activity
      return activity
    end

    # This can feedback to ensure the schedule will not run out so this should fire once an
    # hour to make sure the TimelineMicroservice will collect the next hour and update the
    # schedule.
    def request_update(start:)
      notification = {
        'data' => JSON.generate({ 'time' => start }),
        'kind' => 'refresh',
        'type' => 'timeline',
        'timeline' => @timeline_name
      }
      begin
        TimelineTopic.write_activity(notification, scope: @scope)
      rescue StandardError
        Logger.error "#{@name} manager failed to request update"
      end
    end

    def shutdown
      @cancel_thread = true
      @worker_count.times {
        @queue << nil
      }
    end
  end

  # The timeline microservice starts a manager then gets the activities
  # from the sorted set in redis and updates the schedule for the
  # manager. Timeline will then wait for an update on the timeline
  # stream this will trigger an update again to the schedule.
  class TimelineMicroservice < Microservice
    TIMELINE_METRIC_NAME = 'timeline_activities_duration_seconds'.freeze

    def initialize(name)
      super(name)
      @timeline_name = name.split('__')[2]
      @schedule = Schedule.new(@timeline_name)
      @manager = TimelineManager.new(name: @timeline_name, scope: scope, schedule: @schedule)
      @manager_thread = nil
      @read_topic = true
    end

    def run
      Logger.info "#{@name} timeine running"
      @manager_thread = Thread.new { @manager.run }
      loop do
        start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
        current_activities = ActivityModel.activities(name: @timeline_name, scope: @scope)
        @schedule.update(current_activities)
        diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
        metric_labels = { 'timeline' => @timeline_name, 'thread' => 'microservice' }
        @metric.add_sample(name: TIMELINE_METRIC_NAME, value: diff, labels: metric_labels)
        break if @cancel_thread

        block_for_updates()
        break if @cancel_thread
      end
      Logger.info "#{@name} timeine exitting"
    end

    def topic_lookup_functions
      {
        'timeline' => {
          'created' => :timeline_nop,
          'refresh' => :schedule_refresh,
          'updated' => :timeline_nop,
          'deleted' => :timeline_nop
        },
        'activity' => {
          'event' => :timeline_nop,
          'created' => :create_activity_from_event,
          'updated' => :schedule_refresh,
          'deleted' => :remove_activity_from_event
        }
      }
    end

    def block_for_updates
      @read_topic = true
      while @read_topic
        begin
          TimelineTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis|
            if msg_hash['timeline'] == @timeline_name
              data = JSON.parse(msg_hash['data'])
              public_send(topic_lookup_functions[msg_hash['type']][msg_hash['kind']], data)
            end
          end
        rescue StandardError => e
          Logger.error "#{@timeline_name} failed to read topics #{@topics}\n#{e.formatted}"
        end
      end
    end

    def timeline_nop(data)
      Logger.debug "#{@name} timeline web socket event: #{data}"
    end

    def schedule_refresh(data)
      Logger.debug "#{@name} timeline web socket schedule refresh: #{data}"
      @read_topic = false
    end

    # Add the activity to the schedule. We don't need to hold the job in memory
    # if it is longer than an hour away. A refresh task will update that.
    def create_activity_from_event(data)
      diff = data['start'] - Time.now.to_i
      return unless (2..3600).include? diff

      activity = ActivityModel.from_json(data, name: @timeline_name, scope: @scope)
      @schedule.add_activity(activity)
    end

    # Remove the activity from the schedule. We don't need to remove the activity
    # if it is longer than an hour away. It will be removed from the data.
    def remove_activity_from_event(data)
      diff = data['start'] - Time.now.to_i
      return unless (2..3600).include? diff

      activity = ActivityModel.from_json(data, name: @timeline_name, scope: @scope)
      @schedule.remove_activity(activity)
    end

    def shutdown
      @read_topic = false
      @manager.shutdown
      super
    end
  end
end

Cosmos::TimelineMicroservice.run if __FILE__ == $0