lib/rmt/downloader.rb
require 'typhoeus'
require 'tempfile'
require 'fileutils'
require 'fiber'
require 'rmt'
require 'rmt/config'
require 'rmt/fiber_request'
require 'rmt/deduplicator'
class RMT::Downloader
RETRIES = 4
RETRY_DELAY_SECONDS = 2
attr_accessor :concurrency, :logger, :auth_token
attr_reader :downloaded_files_count, :downloaded_files_size
def initialize(logger:, auth_token: nil, track_files: true)
Typhoeus::Config.user_agent = "RMT/#{RMT::VERSION}"
Typhoeus::Config.verbose = Settings.try(:http_client).try(:verbose)
@concurrency = 4
@auth_token = auth_token
@logger = logger
@track_files = track_files
@queue = []
@downloaded_files_count = 0
@downloaded_files_size = 0
end
# returns the list of files that failed to download when 'ignore_errors: true',
# otherwise raises RMT::Downloader::Exception if any file fails to download
def download_multi(files, ignore_errors: false)
downloads_needed, failed_cache =
try_copying_from_cache(files, ignore_errors: ignore_errors)
return failed_cache if downloads_needed.empty?
@queue = downloads_needed
@hydra = Typhoeus::Hydra.new(max_concurrency: @concurrency)
failed_downloads = ignore_errors ? failed_cache : nil
# initialize queue with @concurrency items, so hydra can work in parallel
@concurrency.times { process_queue(failed_downloads) }
@hydra.run
failed_downloads
end
protected
# Creates a fiber that wraps RMT::FiberRequest and runs it, returning the RMT::FiberRequest object.
# @param [RMT::Mirror::FileReference] file_reference with all file metadata attributes and paths (remote, local, cache)
# @param [Array] failed_downloads array of remote files that have failed downloads, passed by reference, prevents from raising RMT::Downloader exceptions
# @return [RMT::FiberRequest] a request that can be run individually or with Typhoeus::Hydra
def create_fiber_request(file_reference, failed_downloads: nil, retries: RETRIES)
make_file_dir(file_reference.local_path)
request_fiber = Fiber.new do
begin
# make_request will call Fiber.yield on this fiber (request_fiber), returning the request object
# this fiber will be resumed by on_body callback once the request is executed
response = make_request(file_reference, request_fiber)
finalize_download(response.request, file_reference)
rescue RMT::Downloader::Exception, RMT::ChecksumVerifier::Exception => e
# raise if number of retries is exhausted or file not found
if retries.zero? || e.try(:http_code) == 404
# if failed_downloads != nil, we're in 'ignore_errors' mode
if failed_downloads
@logger.warn("× #{File.basename(file_reference.local_path)} - #{e}")
failed_downloads << file_reference
nil
else
# empty queue when raising, so the downloader can get re-used
@queue = []
@hydra.multi.easy_handles.each do |handle|
@hydra.multi.delete(handle)
end
raise e
end
else
@logger.warn(_('Downloading %{file_reference} failed with %{message}. Retrying %{retries} more times after %{seconds} seconds') % {
file_reference: file_reference.remote_path, message: e.message, retries: retries, seconds: RETRY_DELAY_SECONDS
})
sleep(RETRY_DELAY_SECONDS)
# re-enqueuing with retries -= 1
request = create_fiber_request(file_reference, failed_downloads: failed_downloads, retries: (retries - 1))
@hydra.queue(request) if request
end
ensure
process_queue(failed_downloads)
end
end
request_fiber.resume
end
# enqueuing requests one-by-one, so we don't run into 'too many open files' errors
def process_queue(failed_downloads = nil)
queue_item = @queue.shift
return unless queue_item
request = create_fiber_request(queue_item, failed_downloads: failed_downloads)
@hydra.queue(request) if request
end
def make_request(file, request_fiber)
downloaded_file = Tempfile.new('rmt', Dir.tmpdir, mode: File::BINARY, encoding: 'ascii-8bit')
request = RMT::FiberRequest.new(
request_uri(file).to_s,
download_path: downloaded_file,
request_fiber: request_fiber,
followlocation: true
)
@logger.debug("HTTP request for: #{file.remote_path}")
request.receive_headers
request.receive_body
end
def try_copying_from_cache(files, ignore_errors: false)
# We need to verify if the cached copy is still relevant
# Create a HTTP/HTTPS HEAD request if possible, return nil if not
cache_requests = files.map { |file| [file, cache_head_request(file)] }.to_h
available_in_cache = cache_requests.compact.values
# Download everything if the cache is empty
return [files, []] if available_in_cache.empty?
hydra = Typhoeus::Hydra.new(max_concurrency: @concurrency)
available_in_cache.each do |request|
request.on_complete do |response|
if invalid_response?(response)
request.retries ||= RETRIES
if request.retries > 0
@logger.warn(_('Poking %{file_reference} failed with %{message}. Retrying %{retries} more times after %{seconds} seconds') % {
file_reference: URI(request.base_url).path, message: "#{response.return_code} (#{response.code})",
retries: request.retries, seconds: RETRY_DELAY_SECONDS
})
sleep(RETRY_DELAY_SECONDS)
request.retries -= 1
request.run
end
end
end
hydra.queue(request)
end
hydra.run
downloads_needed = []
failed_files = []
cache_requests.each do |file, request|
next downloads_needed << file if request.nil?
next downloads_needed << file unless valid_cached_file?(file, request.response)
copy_from_cache(file)
rescue RMT::Downloader::Exception => e
next failed_files << file.local_path if ignore_errors
raise e
end
[downloads_needed, failed_files]
end
def cache_head_request(file)
# RMT must not make HEAD requests when importing repos (file://)
return nil unless %w[http https].include?(file.remote_path.scheme)
return nil if file.cache_timestamp.nil?
@logger.debug("HTTP HEAD request for: #{file.remote_path}")
RMT::HttpRequest.new(request_uri(file).to_s, method: :head, followlocation: true)
end
def valid_cached_file?(file, response)
RMT::Downloader::Exception.raise_request_error(file.remote_path, response, @logger) if invalid_response?(response)
# response.headers returns Typhoeus::Response::Headers, which takes care of
# case-sensitive concerns with the header's key
last_modified_header = response.headers['Last-Modified']
return false unless last_modified_header
file.cache_timestamp == Time.parse(last_modified_header).utc
end
def copy_from_cache(file)
unless (file.cache_path == file.local_path)
make_file_dir(file.local_path)
FileUtils.cp(file.cache_path, file.local_path, preserve: true)
end
@logger.info("→ #{File.basename(file.local_path)}")
@logger.debug(" (cached mtime matches server last modified: #{file.cache_timestamp})")
end
def finalize_download(request, file)
if (URI(request.base_url).scheme != 'file') && invalid_response?(request.response)
RMT::Downloader::Exception.raise_request_error(request.remote_file, request.response, @logger)
end
handle_checksum_verification!(file.checksum_type, file.checksum, request.download_path)
FileUtils.mv(request.download_path.path, file.local_path)
File.chmod(0o644, file.local_path)
last_modified = request.response.headers['Last-Modified']
if last_modified
timestamp = Time.parse(last_modified).utc
File.utime(timestamp, timestamp, file.local_path)
end
if @track_files && file.local_path.match?(/\.(rpm|drpm)$/)
DownloadedFile.track_file(checksum: file.checksum,
checksum_type: file.checksum_type,
local_path: file.local_path,
size: File.size(file.local_path))
end
@downloaded_files_count += 1
@downloaded_files_size += File.size(file.local_path)
@logger.info("↓ #{File.basename(file.local_path)}")
@logger.debug(" (new mtime: #{File.mtime(file.local_path).utc})")
rescue StandardError => e
request.download_path.unlink
raise e
end
def handle_checksum_verification!(checksum_type, checksum_value, download_path)
return unless (checksum_type && checksum_value)
unless RMT::ChecksumVerifier.match_checksum?(checksum_type, checksum_value, download_path)
raise RMT::Downloader::Exception.new(_("Checksum doesn't match"))
end
end
def invalid_response?(response)
response.code != 200 || (response.return_code && response.return_code != :ok)
end
def request_uri(file)
uri = URI.join(file.remote_path)
uri.query = @auth_token if (@auth_token && uri.scheme != 'file')
if URI(uri).scheme == 'file' && !File.exist?(CGI.unescape(uri.path))
e = RMT::Downloader::Exception.new(_('%{file} - File does not exist') % { file: file.remote_path })
# Similar to http download, set 404 when file is not found, to skip retries
e.http_code = 404
raise e
end
uri.to_s
end
def make_file_dir(file_path)
dirname = File.dirname(file_path)
FileUtils.mkdir_p(dirname)
end
end