cloudfoundry/cloud_controller_ng

View on GitHub
lib/cloud_controller/diego/processes_sync.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'utils/workpool'

module VCAP::CloudController
  module Diego
    class ProcessesSync
      BATCH_SIZE = 500

      class Error < StandardError
      end
      class BBSFetchError < Error
      end

      def initialize(config:, statsd_updater: CloudController::DependencyLocator.instance.statsd_updater)
        @config   = config
        @workpool = WorkPool.new(50, store_exceptions: true)
        @statsd_updater = statsd_updater
      end

      def sync
        logger.info('run-process-sync')
        @bump_freshness = true
        diego_lrps = bbs_apps_client.fetch_scheduling_infos.index_by { |d| d.desired_lrp_key.process_guid }
        logger.info('fetched-scheduling-infos')
        to_desire = []
        to_update = {}
        batched_processes_for_sync do |processes|
          processes.each do |process|
            process_guid = ProcessGuid.from_process(process)
            diego_lrp    = diego_lrps.delete(process_guid)

            if diego_lrp.nil?
              to_desire.append(process.id)
            elsif process.updated_at.to_f.to_s != diego_lrp.annotation
              to_update[process.id] = diego_lrp
            end
          end
        end

        update_lrps(to_update)
        desire_lrps(to_desire)
        delete_lrps(diego_lrps)

        workpool.drain

        process_workpool_exceptions(@workpool.exceptions)
      rescue CloudController::Errors::ApiError => e
        logger.info('sync-failed', error: e.name, error_message: e.message)
        @bump_freshness = false
        raise BBSFetchError.new(e.message)
      rescue StandardError => e
        logger.info('sync-failed', error: e.class.name, error_message: e.message)
        @bump_freshness = false
        raise
      ensure
        workpool.drain
        if @bump_freshness
          bbs_apps_client.bump_freshness
          logger.info('finished-process-sync')
        else
          logger.info('sync-failed')
        end
      end

      private

      attr_reader :config, :workpool

      def process_workpool_exceptions(exceptions)
        invalid_lrps = 0
        exceptions.each do |e|
          error_name = e.is_a?(CloudController::Errors::ApiError) ? e.name : e.class.name
          if error_name == 'RunnerInvalidRequest'
            logger.info('synced-invalid-desired-lrps', error: error_name, error_message: e.message)
            invalid_lrps += 1
          elsif error_name == 'RunnerError' && e.message['the requested resource already exists']
            logger.info('ignore-existing-resource', error: error_name, error_message: e.message)
          elsif error_name == 'RunnerError' && e.message['the requested resource could not be found']
            logger.info('ignore-deleted-resource', error: error_name, error_message: e.message)
          else
            logger.error('error-updating-lrp-state', error: error_name, error_message: e.message, error_backtrace: formatted_backtrace_from_error(e))
            @bump_freshness = false
          end
        end
        @statsd_updater.update_synced_invalid_lrps(invalid_lrps)
      end

      def update_lrps(to_update)
        batched_processes(to_update.keys) do |processes|
          processes.each do |process|
            workpool.submit(process, to_update[process.id]) do |p, l|
              logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid)
              bbs_apps_client.update_app(p, l)
              logger.info('update-lrp', process_guid: p.guid)
            end
          end
        end
      end

      def desire_lrps(to_desire)
        batched_processes(to_desire) do |processes|
          processes.each do |process|
            workpool.submit(process) do |p|
              logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid)
              bbs_apps_client.desire_app(p)
              logger.info('desire-lrp', process_guid: p.guid)
            end
          end
        end
      end

      def delete_lrps(to_delete)
        to_delete.each_key do |process_guid_to_delete|
          workpool.submit(process_guid_to_delete) do |guid|
            logger.info('deleting-lrp', process_guid: guid)
            bbs_apps_client.stop_app(guid)
            logger.info('delete-lrp', process_guid: guid)
          end
        end
      end

      def formatted_backtrace_from_error(error)
        error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : ''
      end

      def batched_processes_for_sync
        last_id = 0

        loop do
          processes = processes_for_sync(last_id).all
          yield processes
          return if processes.count < BATCH_SIZE

          last_id = processes.last.id
        end
      end

      def batched_processes(ids)
        ids.each_slice(BATCH_SIZE) do |id_chunk|
          processes = processes(id_chunk).all
          yield processes
        end
      end

      def processes(ids)
        processes = ProcessModel.
                    diego.
                    runnable.
                    where(Sequel.lit("#{ProcessModel.table_name}.id IN ?", ids)).
                    eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data })
        if FeatureFlag.enabled?(:diego_docker)
          processes.select_all(ProcessModel.table_name)
        else
          # `select_all` is called by `non_docker_type`
          processes.non_docker_type
        end
      end

      def processes_for_sync(last_id)
        processes = ProcessModel.
                    diego.
                    runnable.
                    where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)).
                    order(:"#{ProcessModel.table_name}__id").
                    limit(BATCH_SIZE)

        processes.select(:"#{ProcessModel.table_name}__id", :"#{ProcessModel.table_name}__guid", :"#{ProcessModel.table_name}__version",
                         :"#{ProcessModel.table_name}__updated_at")
      end

      def bbs_apps_client
        CloudController::DependencyLocator.instance.bbs_apps_client
      end

      def logger
        @logger ||= Steno.logger('cc.diego.sync.processes')
      end
    end
  end
end