cloudfoundry/cloud_controller_ng

View on GitHub
app/jobs/runtime/orphaned_blobs_cleanup.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'cloud_controller/dependency_locator'
require 'cloud_controller/clock/clock'
require 'repositories/orphaned_blob_event_repository'

module VCAP::CloudController
  module Jobs
    module Runtime
      class OrphanedBlobsCleanup < VCAP::CloudController::Jobs::CCJob
        DIRTY_THRESHOLD = 3
        NUMBER_OF_BLOBS_TO_MARK = 10_000
        IGNORED_DIRECTORY_PREFIXES = [
          CloudController::DependencyLocator::BUILDPACK_CACHE_DIR,
          CloudController::DependencyLocator::RESOURCE_POOL_DIR
        ].freeze

        def perform
          unless config.get(:perform_blob_cleanup)
            logger.info('Skipping OrphanedBlobsCleanup as the `perform_blob_cleanup` manifest property is false')
            return
          end

          day_of_week = Time.now.wday
          cleanup(day_of_week)
        end

        def cleanup(day_of_week)
          logger.info("Started orphaned blobs cleanup job for day of week: #{day_of_week}")

          update_existing_orphaned_blobs

          number_of_marked_blobs = 0

          unique_blobstores.each do |blobstore_config|
            blobstore_type = blobstore_config[:type].to_s
            directory_key  = blobstore_config[:directory_key]
            blobstore      = CloudController::DependencyLocator.instance.public_send(blobstore_type)

            daily_directory_subset(day_of_week).each do |prefix|
              blobstore.files_for(prefix).each do |blob|
                next if blob_in_use?(blob.key) || OrphanedBlob.find(blob_key: blob.key, blobstore_type: blobstore_type)

                logger.info("Creating orphaned blob: #{blob.key} from directory_key: #{directory_key}")
                OrphanedBlob.create(blob_key: blob.key, dirty_count: 1, blobstore_type: blobstore_type)
                number_of_marked_blobs += 1

                if number_of_marked_blobs >= NUMBER_OF_BLOBS_TO_MARK
                  logger.info("Finished orphaned blobs cleanup job early after marking #{number_of_marked_blobs} blobs")
                  return 'finished-early'
                end
              end
            end
          end
        rescue CloudController::Blobstore::BlobstoreError => e
          logger.error("Failed orphaned blobs cleanup job with BlobstoreError: #{e.message}")
          raise
        ensure
          delete_orphaned_blobs
          logger.info('Finished orphaned blobs cleanup job')
        end

        def max_attempts
          1
        end

        def job_name_in_configuration
          :orphaned_blobs_cleanup
        end

        private

        def unique_blobstores
          return @unique_blobstores if @unique_blobstores.present?

          full_list = [
            {
              type: :droplet_blobstore,
              directory_key: config.get(:droplets, :droplet_directory_key),
              root_dir: CloudController::DependencyLocator.instance.public_send(:droplet_blobstore).root_dir
            },
            {
              type: :package_blobstore,
              directory_key: config.get(:packages, :app_package_directory_key),
              root_dir: CloudController::DependencyLocator.instance.public_send(:package_blobstore).root_dir
            },
            {
              type: :buildpack_blobstore,
              directory_key: config.get(:buildpacks, :buildpack_directory_key),
              root_dir: CloudController::DependencyLocator.instance.public_send(:buildpack_blobstore).root_dir
            },
            {
              type: :legacy_global_app_bits_cache,
              directory_key: config.get(:resource_pool, :resource_directory_key),
              root_dir: CloudController::DependencyLocator.instance.public_send(:legacy_global_app_bits_cache).root_dir
            }
          ]

          unique_blobstores = []
          full_list.each do |blobstore_config|
            unique_blobstores << blobstore_config unless unique_blobstores.any? do |b|
              b[:directory_key] == blobstore_config[:directory_key] && b[:root_dir] == blobstore_config[:root_dir]
            end
          end

          @unique_blobstores = unique_blobstores
        end

        def update_existing_orphaned_blobs
          dataset = OrphanedBlob.order(Sequel.desc(:dirty_count)).limit(NUMBER_OF_BLOBS_TO_MARK)

          dataset.each do |orphaned_blob|
            if blob_in_use?(orphaned_blob.blob_key)
              unorphan_blob(orphaned_blob)
              next
            end

            logger.info("Incrementing dirty count for blob: #{orphaned_blob.blob_key}")
            orphaned_blob.update(dirty_count: Sequel.+(:dirty_count, 1))
          end
        end

        def unorphan_blob(orphaned_blob)
          logger.info("Un-orphaning previously orphaned blob: #{orphaned_blob.blob_key}")
          orphaned_blob.delete
        end

        def daily_directory_subset(day_of_week)
          # Our blobstore directories are namespaced using hex-values (e.g. 00/6c, ff/56, etc.)
          directory_subsets = [0x00..0x24, 0x25..0x48, 0x49..0x6c, 0x6d..0x90, 0x91..0xb4, 0xb5..0xd8, 0xd9..0xff].freeze

          directories_to_iterate = directory_subsets[day_of_week]
          directories_to_iterate.map { |decimal| decimal.to_s(16).rjust(2, '0') }
        end

        def blob_in_use?(blob_key)
          path_parts = blob_key.split(File::Separator)

          return true if blob_key.start_with?(*IGNORED_DIRECTORY_PREFIXES)

          if path_parts.length == 4
            potential_droplet_guid = path_parts[-2]
            basename               = path_parts[-1]

            DropletModel.find(guid: potential_droplet_guid, droplet_hash: basename).present?
          elsif path_parts.length == 3
            basename = path_parts[-1]

            PackageModel.find(guid: basename).present? ||
              Buildpack.find(key: basename).present?
          else
            true
          end
        end

        def delete_orphaned_blobs
          logger.info('Attempting to delete orphaned blobs')

          dataset = OrphanedBlob.where { dirty_count >= DIRTY_THRESHOLD }.
                    order(Sequel.desc(:dirty_count)).
                    limit(NUMBER_OF_BLOBS_TO_MARK)

          dataset.each do |orphaned_blob|
            unpartitioned_blob_key = CloudController::Blobstore::BlobKeyGenerator.key_from_full_path(orphaned_blob.blob_key)
            blobstore_type         = orphaned_blob.blobstore_type.to_sym
            directory_key          = directory_key_for_type(blobstore_type)

            logger.info("Enqueuing deletion of orphaned blob #{orphaned_blob.blob_key} inside directory_key #{directory_key}")
            blobstore_delete_job = BlobstoreDelete.new(unpartitioned_blob_key, blobstore_type)
            Jobs::Enqueuer.new(blobstore_delete_job, queue: Jobs::Queues.generic, priority: VCAP::CloudController::Clock::LOW_PRIORITY).enqueue

            VCAP::CloudController::Repositories::OrphanedBlobEventRepository.record_delete(directory_key, orphaned_blob.blob_key)
            orphaned_blob.delete
          end
        end

        def directory_key_for_type(type)
          blobstore_config = unique_blobstores.find { |b| b[:type].to_s == type.to_s }
          raise "Could not find blobstore config matching blobstore type '#{type}': #{unique_blobstores.inspect}" if blobstore_config.nil?

          blobstore_config[:directory_key]
        end

        def logger
          @logger ||= Steno.logger('cc.background.orphaned-blobs-cleanup')
        end

        def config
          @config ||= Config.config
        end
      end
    end
  end
end