Katello/katello

View on GitHub
app/services/katello/pulp3/repository.rb

Summary

Maintainability
F
3 days
Test Coverage
# rubocop:disable Metrics/ClassLength
require "pulpcore_client"
module Katello
  module Pulp3
    class Repository
      include Katello::Util::HttpProxy
      include Katello::Pulp3::ServiceCommon
      attr_accessor :repo
      attr_accessor :smart_proxy
      delegate :root, to: :repo
      delegate :pulp3_api, to: :smart_proxy

      COPY_UNIT_PAGE_SIZE = 10_000

      def initialize(repo, smart_proxy)
        @repo = repo
        @smart_proxy = smart_proxy
      end

      def self.version_href?(href)
        /.*\/versions\/\d*\//.match(href)
      end

      def self.publication_href?(href)
        href.include?('/publications/')
      end

      def partial_repo_path
        fail NotImplementedError
      end

      def with_mirror_adapter
        if smart_proxy.pulp_primary?
          return self
        else
          return RepositoryMirror.new(self)
        end
      end

      def self.api(smart_proxy, repository_type_label)
        repo_type = RepositoryTypeManager.enabled_repository_types[repository_type_label]
        fail _("%s content type is not enabled." % repository_type_label) unless repo_type
        repo_type.pulp3_api(smart_proxy)
      end

      def core_api
        Katello::Pulp3::Api::Core.new(smart_proxy)
      end

      def api
        @api ||= self.class.api(smart_proxy, repo.content_type)
      end

      def published?
        !repo.publication_href.nil?
      end

      def repair(repository_version_href)
        data = api.repair_class.new
        api.repository_versions_api.repair(repository_version_href, data)
      end

      def skip_types
        nil
      end

      def content_service
        Katello::Pulp3::Content
      end

      def create_remote
        response = super
        repo.update!(:remote_href => response.pulp_href)
      end

      def update_remote
        href = repo.remote_href
        if remote_options[:url].blank?
          if href
            repo.update(remote_href: nil)
            delete_remote(href: href)
          end
        else
          if href
            remote_partial_update
          else
            create_remote
            return nil #return nil, as create isn't async
          end
        end
      end

      def remote_partial_update
        url_type = remote_options[:url]&.start_with?('uln') ? 'uln' : 'default'
        remote_type = repo.remote_href.start_with?('/pulp/api/v3/remotes/rpm/uln/') ? 'uln' : 'default'
        href = repo.remote_href

        if url_type == remote_type
          api.get_remotes_api(href: href).partial_update(href, remote_options)
        else # We need to recreate a remote of the correct type!
          create_remote
          delete_remote(href: href)
        end
      end

      def delete_remote(options = {})
        options[:href] ||= repo.remote_href
        ignore_404_exception { api.get_remotes_api(href: options[:href]).delete(options[:href]) } if options[:href]
      end

      def self.instance_for_type(repo, smart_proxy)
        Katello::RepositoryTypeManager.enabled_repository_types[repo.root.content_type].pulp3_service_class.new(repo, smart_proxy)
      end

      def should_purge_empty_contents?
        false
      end

      def generate_backend_object_name
        "#{root.label}-#{repo.id}#{rand(9999)}"
      end

      def repository_reference
        RepositoryReference.find_by(:root_repository_id => repo.root_id, :content_view_id => repo.content_view.id)
      end

      def distribution_reference
        DistributionReference.find_by(:repository_id => repo.id)
      end

      def create_mirror_entities
        RepositoryMirror.new(self).create_entities
      end

      def refresh_mirror_entities
        RepositoryMirror.new(self).refresh_entities
      end

      def refresh_if_needed
        tasks = []
        tasks << update_remote #always update remote
        tasks << update_distribution if distribution_needs_update?
        tasks.compact
      end

      def get_remote(href = repo.remote_href)
        api.get_remotes_api(href: href).read(href)
      end

      def get_distribution(href = distribution_reference.href)
        api.get_distribution(href)
      end

      def distribution_needs_update?
        if distribution_reference
          expected = secure_distribution_options(relative_path).except(:name).compact
          actual = get_distribution&.to_hash || {}
          expected != actual.slice(*expected.keys)
        elsif repo.environment
          true
        else
          false
        end
      end

      def compute_remote_options(computed_options = remote_options)
        computed_options.except(:name, :client_key)
      end

      def create(force = false)
        if force || !repository_reference
          response = api.repositories_api.create(create_options)
          RepositoryReference.where(
            root_repository_id: repo.root_id,
            content_view_id: repo.content_view.id).destroy_all
          RepositoryReference.where(
            root_repository_id: repo.root_id,
            content_view_id: repo.content_view.id,
            repository_href: response.pulp_href).create!
          response
        end
      end

      def update
        api.repositories_api.update(repository_reference.try(:repository_href), create_options)
      end

      def list(options)
        api.repositories_api.list(options).results
      end

      def read
        api.repositories_api.read(repository_reference.try(:repository_href))
      end

      def delete_repository(repo_reference = repository_reference)
        href = repo_reference.try(:repository_href)
        repo_reference.try(:destroy)
        ignore_404_exception { api.repositories_api.delete(href) } if href
      end

      def sync(options = {})
        repository_sync_url_data = api.repository_sync_url_class.new(sync_url_params(options))
        [api.repositories_api.sync(repository_reference.repository_href, repository_sync_url_data)]
      end

      def sync_url_params(_sync_options)
        params = {remote: repo.remote_href, mirror: repo.root.mirroring_policy == Katello::RootRepository::MIRRORING_POLICY_CONTENT}
        params[:skip_types] = skip_types if (skip_types && repo.root.mirroring_policy != Katello::RootRepository::MIRRORING_POLICY_COMPLETE)
        params
      end

      def create_publication
        publication_data = api.publication_class.new(publication_options(repo.version_href))
        api.publications_api.create(publication_data)
      end

      def delete_publication
        ignore_404_exception { api.publications_api.delete(repo.publication_href) } if repo.publication_href
      end

      def publication_options(repository_version)
        {
          repository_version: repository_version
        }
      end

      def relative_path
        repo.relative_path.sub(/^\//, '')
      end

      def refresh_distributions
        if repo.docker?
          dist = lookup_distributions(base_path: repo.container_repository_name).first
        else
          dist = lookup_distributions(base_path: repo.relative_path).first
        end
        dist_ref = distribution_reference

        if dist && !dist_ref
          save_distribution_references([dist.pulp_href])
          return update_distribution
        end

        if dist && dist_ref
          # If the saved distribution reference is wrong, delete it and use the existing distribution
          if dist.pulp_href != dist_ref.href
            dist_ref.destroy
            save_distribution_references([dist.pulp_href])
          end
          return update_distribution
        end

        # Since we got this far, we need to create a new distribution
        # Note: the distribution reference can't be saved yet because distribution creation is async
        begin
          create_distribution(relative_path)
        rescue api.client_module::ApiError => e
          # Now it seems there is a distribution. Fetch it and save the reference.
          if e.message.include?("\"base_path\":[\"This field must be unique.\"]") ||
              e.message.include?("\"base_path\":[\"Overlaps with existing distribution\"")
            dist = lookup_distributions(base_path: repo.relative_path).first
            save_distribution_references([dist.pulp_href])
            return update_distribution
          else
            raise e
          end
        end
      end

      def create_distribution(path)
        distribution_data = api.distribution_class.new(secure_distribution_options(path))
        unless ::Katello::RepositoryTypeManager.find(repo.content_type).pulp3_skip_publication
          fail_missing_publication(distribution_data.publication)
        end
        api.distributions_api.create(distribution_data)
      end

      def lookup_distributions(args)
        api.distributions_api.list(args).results
      end

      def read_distribution(href = distribution_reference.href)
        ignore_404_exception { api.distributions_api.read(href) }
      end

      def update_distribution
        if distribution_reference
          options = secure_distribution_options(relative_path).except(:name)
          unless ::Katello::RepositoryTypeManager.find(repo.content_type).pulp3_skip_publication
            fail_missing_publication(options[:publication])
          end
          distribution_reference.update(:content_guard_href => options[:content_guard])
          api.distributions_api.partial_update(distribution_reference.href, options)
        end
      end

      def copy_units_by_href(unit_hrefs)
        tasks = []
        unit_hrefs.each_slice(COPY_UNIT_PAGE_SIZE) do |slice|
          tasks << create_version(:add_content_units => slice)
        end
        tasks
      end

      def copy_all(source_repository, options = {})
        tasks = []
        if options[:remove_all]
          tasks << api.repositories_api.modify(repository_reference.repository_href, remove_content_units: ['*'])
        end

        if options[:mirror] && api.class.respond_to?(:add_remove_content_class)
          data = api.class.add_remove_content_class.new(
                    base_version: source_repository.version_href)

          tasks << api.repositories_api.modify(repository_reference.repository_href, data)
          tasks
        elsif api.respond_to? :copy_api
          data = api.class.copy_class.new
          data.config = [{
            source_repo_version: source_repository.version_href,
            dest_repo: repository_reference.repository_href
          }]
          tasks << api.copy_api.copy_content(data)
          tasks
        else
          copy_content_for_source(source_repository)
        end
      end

      def copy_version(from_repository)
        create_version(:base_version => from_repository.version_href)
      end

      def version_zero?
        repo.version_href.ends_with?('/versions/0/')
      end

      def delete_version
        ignore_404_exception { api.repository_versions_api.delete(repo.version_href) } unless version_zero?
      rescue api.api_exception_class => e
        if e.message.include?("are currently being used to distribute content")
          Rails.logger.warn "Exception when calling repository_versions_api->delete: #{e}"
          publication_href = repo.publication_href
          Rails.logger.warn "Trying to delete publication #{publication_href} for repository #{repo.id}}"
          Rails.logger.error "Could not delete version: #{repo.version_href} because conflicting publication could not be looked up" unless publication_href
          if publication_href
            ignore_404_exception { api.publications_api.delete(publication_href) }
            ignore_404_exception { api.repository_versions_api.delete(repo.version_href) }
          end
        end
      end

      def create_version(options = {})
        api.repositories_api.modify(repository_reference.repository_href, options)
      end

      def save_distribution_references(hrefs)
        hrefs.each do |href|
          pulp3_distribution_data = api.get_distribution(href)
          path, content_guard_href = pulp3_distribution_data&.base_path, pulp3_distribution_data&.content_guard
          if distribution_reference
            found_distribution = read_distribution(distribution_reference.href)
            unless found_distribution
              distribution_reference.destroy
            end
          end
          unless distribution_reference
            # Ensure that duplicates won't be created in the case of a race condition
            DistributionReference.where(path: path, href: href, repository_id: repo.id, content_guard_href: content_guard_href).first_or_create!
          end
        end
      end

      def delete_distributions
        if (dist_ref = distribution_reference)
          ignore_404_exception { api.delete_distribution(dist_ref.href) }
          dist_ref.destroy!
        end
      end

      def delete_distributions_by_path
        path = relative_path
        dists = lookup_distributions(base_path: path)

        task = api.delete_distribution(dists.first.pulp_href) if dists.first
        Katello::Pulp3::DistributionReference.where(:path => path).destroy_all
        task
      end

      def common_remote_options
        remote_options = {
          tls_validation: root.verify_ssl_on_sync,
          name: generate_backend_object_name,
          url: root.url,
          proxy_url: root.http_proxy&.url,
          proxy_username: root.http_proxy&.username,
          proxy_password: root.http_proxy&.password,
          total_timeout: Setting[:sync_total_timeout],
          connect_timeout: Setting[:sync_connect_timeout_v2],
          sock_connect_timeout: Setting[:sync_sock_connect_timeout],
          sock_read_timeout: Setting[:sync_sock_read_timeout],
          rate_limit: Setting[:download_rate_limit]
        }
        remote_options[:url] = root.url unless root.url.blank?
        remote_options[:download_concurrency] = root.download_concurrency unless root.download_concurrency.blank?
        remote_options.merge!(username: root&.upstream_username,
                              password: root&.upstream_password)
        remote_options[:username] = nil if remote_options[:username] == ''
        remote_options[:password] = nil if remote_options[:password] == ''
        remote_options.merge!(ssl_remote_options)
      end

      def mirror_remote_options
        options = {}
        if Katello::RootRepository::CONTENT_ATTRIBUTE_RESTRICTIONS[:download_policy].include?(repo.content_type)
          options[:policy] = smart_proxy.download_policy
          if smart_proxy.download_policy == SmartProxy::DOWNLOAD_INHERIT
            options[:policy] = repo.root.download_policy
          end
        end
        options
      end

      def create_options
        { name: generate_backend_object_name }.merge!(specific_create_options)
      end

      def specific_create_options
        {}
      end

      def secure_distribution_options(path)
        secured_distribution_options = {}
        if root.unprotected
          secured_distribution_options[:content_guard] = nil
        else
          secured_distribution_options[:content_guard] = ::Katello::Pulp3::ContentGuard.first.pulp_href
        end
        secured_distribution_options.merge!(distribution_options(path))
      end

      def ssl_remote_options
        options = {}
        if root.redhat? && root.cdn_configuration.redhat_cdn?
          options = {
            client_cert: root.product.certificate,
            client_key: root.product.key,
            ca_cert: Katello::Repository.feed_ca_cert(root.url)
          }
        elsif root.redhat? && root.cdn_configuration.custom_cdn?
          options = {
            ca_cert: root.cdn_configuration.ssl_ca
          }
        elsif root.redhat? && root.cdn_configuration.network_sync?
          options = {
            client_cert: root.cdn_configuration.ssl_cert,
            client_key: root.cdn_configuration.ssl_key,
            ca_cert: root.cdn_configuration.ssl_ca
          }
        elsif root.custom?
          options = {
            client_cert: root.ssl_client_cert&.content,
            client_key: root.ssl_client_key&.content,
            ca_cert: root.ssl_ca_cert&.content
          }
        end
        append_proxy_cacert(options) if options.key?(:cacert)
        options
      end

      def append_proxy_cacert(options)
        if root.http_proxy&.cacert&.present? && options.key?(:cacert)
          options[:cacert] += "\n#{root.http_proxy&.cacert}"
        end
        options
      end

      def lookup_version(href)
        api.repository_versions_api.read(href) if href
      rescue api.api_exception_class => e
        Rails.logger.error "Exception when calling repository_versions_api->read: #{e}"
        nil
      end

      def lookup_publication(href)
        api.publications_api.read(href) if href
      rescue api.api_exception_class => e
        Rails.logger.error "Exception when calling publications_api->read: #{e}"
        nil
      end

      def remove_content(content_units)
        if repo.root.content_type == "docker"
          api.repositories_api.remove(repository_reference.repository_href, content_units: content_units.map(&:pulp_id))
        else
          api.repositories_api.modify(repository_reference.repository_href, remove_content_units: content_units.map(&:pulp_id))
        end
      end

      def repository_import_content(artifact_href, options = {})
        ostree_import = PulpOstreeClient::OstreeRepoImport.new
        ostree_import.artifact = artifact_href
        ostree_import.repository_name = options[:ostree_repository_name]
        ostree_import.ref = options[:ostree_ref]
        api.repositories_api.import_commits(repository_reference.repository_href, ostree_import)
      end

      def add_content(content_unit_href, remove_all_units = false)
        content_unit_href = [content_unit_href] unless content_unit_href.is_a?(Array)
        if remove_all_units
          api.repositories_api.modify(repository_reference.repository_href, remove_content_units: ['*'])
          api.repositories_api.modify(repository_reference.repository_href, add_content_units: content_unit_href)
        else
          api.repositories_api.modify(repository_reference.repository_href, add_content_units: content_unit_href)
        end
      rescue api.client_module::ApiError => e
        if e.message.include? 'Could not find the following content units'
          raise ::Katello::Errors::Pulp3Error, "Content units that do not exist in Pulp were requested to be copied."\
             " Please run `foreman-rake katello:delete_orphaned_content` to fix the following repository:"\
            " #{repository_reference.root_repository.name}. Original error: #{e.message}"
        else
          raise e
        end
      end

      def add_content_for_repo(repository_href, content_unit_href)
        content_unit_href = [content_unit_href] unless content_unit_href.is_a?(Array)
        api.repositories_api.modify(repository_href, add_content_units: content_unit_href)
      rescue api.client_module::ApiError => e
        if e.message.include? 'Could not find the following content units'
          raise ::Katello::Errors::Pulp3Error, "Content units that do not exist in Pulp were requested to be copied."\
            " Please run `foreman-rake katello:delete_orphaned_content` to fix the following repository:"\
            " #{::Katello::Pulp3::RepositoryReference.find_by(repository_href: repository_href).root_repository.name}. Original error: #{e.message}"
        else
          raise e
        end
      end

      def unit_keys(uploads)
        uploads.map do |upload|
          upload.except('id')
        end
      end

      def retain_package_versions_count
        return 0 if root.retain_package_versions_count.nil? || root.using_mirrored_content?
        root.retain_package_versions_count.to_i
      end

      def fail_missing_publication(publication_href)
        unless lookup_publication(publication_href)
          fail _("The repository's publication is missing. Please run a 'complete sync' on %s." % repo.name)
        end
      end
    end
  end
end