app/models/source.rb
require 'mimemagic_ext'
require 'net/ftp'
class Source < ApplicationModel
extend Enumerize
belongs_to :workbench, optional: false
has_many :retrievals, class_name: "Source::Retrieval", foreign_key: "source_id", dependent: :destroy
validates :name, presence: true
validates :url, presence: true
validates :downloader_type, presence: true
validates :downloader_option_raw_authorization, presence: true, if: :authorization_downloader_type?
#validates_associated :downloader
enumerize :downloader_type, in: %i(direct french_nap authorization ftp), default: :direct
scope :enabled, -> { where.not(retrieval_frequency: 'none') }
before_validation :clean, on: :update
attribute :retrieval_time_of_day, TimeOfDay::Type::TimeWithoutZone.new
attribute :retrieval_days_of_week, WeekDays.new
belongs_to :scheduled_job, class_name: '::Delayed::Job', dependent: :destroy
validates :retrieval_time_of_day, presence: true, if: :retrieval_frequency_daily?
validates :retrieval_days_of_week, presence: true, if: :enabled?
enumerize :retrieval_frequency, in: %w[none hourly daily], default: 'none', predicates: { prefix: true }
def enabled?
!retrieval_frequency_none?
end
def next_retrieval
return if retrieval_frequency_none?
scheduled_job&.run_at
end
# ?? Rails 5 ActiveRecord::AttributeAssignment .. doesn't create an object
# by invoke writer with multiparameter attributes (like {1 => 13, 2 => 15})
def retrieval_time_of_day=(time_of_day)
if time_of_day.is_a?(Hash) && time_of_day.keys == [1,2]
time_of_day = TimeOfDay.new(time_of_day[1], time_of_day[2])
end
super time_of_day
end
def reschedule
scheduled_job&.destroy
schedule
end
def schedule
return unless enabled?
job = ScheduledJob.new(self)
scheduled_job = Delayed::Job.enqueue(job, cron: job.cron)
# To avoid a new save sequence
update_column :scheduled_job_id, scheduled_job.id
end
# Reschedule and save the Source. Uses this method to force a rescheduling, in migration for example.
def reschedule!
reschedule
save
end
def reschedule_needed?
retrieval_frequency_changed? || retrieval_time_of_day_changed? || retrieval_days_of_week_changed?
end
def retrieval_days_of_week_attributes=(attributes)
self.retrieval_days_of_week = Cuckoo::Timetable::DaysOfWeek.new(attributes)
end
# REMOVEME after CHOUETTE-2007
before_validation ->(source) { source.retrieval_time_of_day ||= TimeOfDay.new(0, 0) }, if: :enabled?
before_update :reschedule, if: :reschedule_needed?
after_commit :schedule, on: :create, if: :enabled?
# Uses to start the Source retrieval at the expected time
class ScheduledJob
def initialize(source)
@source = source
@source_id = source.id
end
attr_reader :source_id
def encode_with(coder)
coder['source_id'] = source_id
end
delegate :retrieval_time_of_day, :retrieval_frequency, :retrieval_days_of_week, to: :source
def cron
case retrieval_frequency
when 'daily'
if retrieval_time_of_day
"#{retrieval_time_of_day.minute} #{retrieval_time_of_day.hour} * * #{retrieval_days_of_week_cron}"
end
when 'hourly'
"#{hourly_random % 60} * * * #{retrieval_days_of_week_cron}"
end
end
def retrieval_days_of_week_cron
return '*' if retrieval_days_of_week.all?
retrieval_days_of_week.days.map do |day_of_week|
day_of_week.to_s.first(3)
end.join(',')
end
def hourly_random
source.id || Random.rand(60)
end
def source
@source ||= Source.find_by(id: source_id)
end
def perform
source.retrieve if source.enabled?
rescue StandardError => e
Chouette::Safe.capture "Can't start Source##{source_id} retrieval", e
end
end
def authorization_downloader_type?
downloader_type == 'authorization'
end
def clean
unless downloader_type == "authorization"
self.downloader_options = self.downloader_options.except("raw_authorization")
end
end
def import_option_line_provider
candidate_line_providers.find_by(id: import_option_line_provider_id) || workbench.default_line_provider
end
def import_option_stop_area_provider
candidate_stop_area_providers.find_by(id: import_option_stop_area_provider_id) || workbench.default_stop_area_provider
end
def import_option_automatic_merge
import_options["automatic_merge"]
end
def import_option_archive_on_fail
import_options["archive_on_fail"]
end
def import_option_update_workgroup_providers
import_options["update_workgroup_providers"]
end
def import_option_process_gtfs_route_ids
(import_options["process_gtfs_route_ids"] || []).join(",")
end
def import_option_store_xml
import_options["store_xml"]
end
def import_option_disable_missing_resources
import_options["disable_missing_resources"]
end
def import_option_strict_mode
import_options["strict_mode"]
end
def import_option_line_provider_id
import_options["line_provider_id"]
end
def import_option_stop_area_provider_id
import_options["stop_area_provider_id"]
end
def import_option_automatic_merge=(value)
import_options["automatic_merge"] = value
end
def import_option_archive_on_fail=(value)
import_options["archive_on_fail"] = value
end
def import_option_update_workgroup_providers=(value)
import_options["update_workgroup_providers"] = value
end
def import_option_process_gtfs_route_ids=(value)
import_options["process_gtfs_route_ids"] = value.delete('\t\r\n').split(',')
end
def import_option_store_xml=(value)
import_options["store_xml"] = value
end
def import_option_disable_missing_resources=(value)
import_options["disable_missing_resources"] = value
end
def import_option_strict_mode=(value)
import_options["strict_mode"] = value
end
def import_option_line_provider_id=(value)
import_options["line_provider_id"] = value
end
def import_option_stop_area_provider_id=(value)
import_options["stop_area_provider_id"] = value
end
def update_workgroup_providers?
import_options["update_workgroup_providers"] == "true"
end
def store_xml?
import_options["store_xml"] == "true"
end
def disable_missing_resources?
import_options["disable_missing_resources"] == "true"
end
def downloader_option_raw_authorization
downloader_options["raw_authorization"]
end
def downloader_option_raw_authorization=(value)
downloader_options["raw_authorization"] = value
end
def downloader_class
if downloader_type.present? && downloader_type != :direct
Downloader.const_get(downloader_type.camelcase)
else
Downloader::URL
end
end
def downloader
downloader_class.new url, downloader_options
end
def retrieve
retrieval = retrievals.create(creator: 'Source')
retrieval.enqueue
retrievals.delete_older
end
delegate :line_providers, :stop_area_providers, to: :workbench, prefix: :candidate
module Downloader
class Base
attr_reader :url
def initialize(url, options = {})
@url = url
options.each { |k,v| send "#{k}=", v }
end
end
class URL < Base
mattr_accessor :timeout, default: 120.seconds
def download(path, headers = {})
Fetcher.new(url, headers, read_timeout: timeout).fetch(path)
end
end
# Downlaod a given URL. Manages redirection
class Fetcher
attr_reader :url, :headers
attr_accessor :read_timeout
attr_writer :redirection_count
def initialize(url, headers = {}, options = {})
@url = url
@headers = headers
options.each { |k, v| send "#{k}=", v }
end
def request
@request ||= Net::HTTP::Get.new(uri).tap do |request|
headers.each { |name, value| request[name] = value }
end
end
def response
http.start do |http|
http.request request
end
end
def uri
@uri ||= URI(url)
end
def use_ssl?
uri.instance_of?(URI::HTTPS)
end
def http
Net::HTTP.new(uri.hostname, uri.port).tap do |http|
http.use_ssl = use_ssl?
http.read_timeout = read_timeout if read_timeout
end
end
def error_for(response) # rubocop:disable Metrics/MethodLength
Rails.logger.debug { "HTTP response: #{response.code} #{response.body}" }
message_key =
# TODO: we should/could test the response class
case response.code
when '404'
:url_not_found
when '401', '403'
:authentication_failed
when /^50\d$/
:url_not_available
else
:download_failed
end
Source::Retrieval::Error.new message_key
end
def redirection_count
@redirection_count ||= 10
end
def allow_redirect?
redirection_count > 1
end
def redirect
raise Source::Retrieval::Error, :url_not_available unless allow_redirect?
Fetcher.new(response['location'], headers, redirection_count: redirection_count - 1)
end
def fetch(path)
http.start do |http|
http.request request do |response|
return redirect.fetch(path) if response.is_a?(Net::HTTPRedirection)
raise error_for(response) unless response.is_a?(Net::HTTPSuccess)
File.open(path, 'wb') do |file|
response.read_body file
end
end
end
end
end
class FrenchNap < Base
def download(path)
URL.new(link).download(path)
end
def page
# FIXME: this download should be protected
Nokogiri::HTML(URI.open(url))
end
def link
# New layout : some download links are in a "div.resource-actions" class element, others just in a table
# We prefer to use table links because we have absolute url and never relative url
l = page.css('table')
l.css('a').first["href"]
end
end
class Authorization < Base
attr_accessor :raw_authorization
#validates_presence_of :raw_authorization
def download(path)
URL.new(url).download(path, headers)
end
private
def headers
return {} unless raw_authorization
{ 'Authorization' => raw_authorization }
end
end
class Ftp < Base
def download(path)
ftp.getbinaryfile(remote_filename, path)
end
def uri
@uri ||= URI(url)
end
def host
@host ||= uri.host
end
def port
@port ||= uri.port
end
def username
@username ||= (uri.user || 'anonymous')
end
def password
@password ||= (uri.password || 'chouette@enroute.mobi')
end
def remote_dir
@remote_dir ||= File.dirname uri.path
end
def remote_filename
@remote_filename ||= File.basename uri.path
end
def ftp
@ftp ||= Net::FTP.new.tap do |ftp|
ftp.connect host, port
ftp.login username, password
ftp.chdir remote_dir
ftp.passive = true
end
end
end
end
module Checksumer
def self.for(type)
type.zip? ? ZIP : File
end
class File
include Measurable
attr_reader :file
def initialize(file)
@file = file
end
def digest
@digest ||= Digest::SHA256.new
end
def checksum
checksum!
digest.hexdigest
end
protected
def digest_stream(io)
buffer = ""
while io.read 16384, buffer
digest.update buffer
end
end
def checksum!
::File.open(file) do |file|
digest_stream file
end
end
end
class ZIP < File
MAX_SIZE = 1024.megabytes
# Digest entries name and content
# Don't use a Zip::InputStream to avoid difference with entry order change
def checksum!
::Zip::File.open(file) do |zip_file|
# Sort the entries by name to always digest in the same order
zip_file.glob('*').sort_by(&:name).each do |entry|
# We could read only the beginning of larger files
raise 'File too large when extracted' if entry.size > MAX_SIZE
next unless entry.file?
# Digest the entry name
digest.update entry.name
# Digest the entry content
digest_stream entry.get_input_stream
end
end
end
end
end
class Retrieval < Operation
include Measurable
belongs_to :source, optional: false
belongs_to :import, class_name: "Import::Workbench"
belongs_to :workbench, optional: false
before_validation :set_workbench, on: :create
delegate :workgroup, to: :workbench
attr_accessor :source_error
def perform
unless source
logger.info "Source #{source_id} no longer available"
return
end
download
process
# We could add an option to ignore the checksum / force the import
if checksum_changed?
logger.info "Checksum has changed"
create_import
source.update checksum: checksum
self.message_key = :new_content_detected
else
self.message_key = :no_import_required
logger.info "Checksum unchanged. Import is skipped"
end
rescue Source::Retrieval::Error => e
logger.info "Retrieval failed with user message: #{e.message_key}"
self.message_key = e.message_key
self.source_error = e
ensure
save
end
def final_user_status
return Operation.user_status.failed if source_error
super
end
delegate :downloader, :import_options, to: :source
def downloaded_file
@downloaded_file ||= Tempfile.new(["retrieval-downloaded"])
end
def download
logger.info "Download with #{downloader.class}"
downloader.download downloaded_file
end
measure :download
def user_message
return '-' unless message_key
I18n.translate message_key, scope: 'source/retrieval.user_message'
end
# To be replaced by import features (line selection, ignore parents, etc)
def process
return unless processor
logger.info "Process downloaded file"
processor.process downloaded_file, processed_file
end
measure :process
def processor
Processor::GTFS.new.with_options(processing_options).presence || Processor::Copy.new
end
class Error < StandardError
attr_reader :message_key
def initialize(message_key = nil)
super(nil)
@message_key = message_key
end
end
def processed_file
@processed_file ||= Tempfile.new(["retrieval-processed",".#{downloaded_file_type.default_extension}"])
end
def import_name
"#{source.name} #{I18n.l(Time.zone.today)}"
end
def imported_file
@processed_file || downloaded_file
end
def downloaded_file_type
@file_type ||= MimeMagic.by_magic(downloaded_file)
end
def checksumer_class
Checksumer.for(downloaded_file_type)
end
def checksumer
checksumer_class.new imported_file
end
def checksum
@checksum ||= checksumer.checksum
end
def checksum_changed?
source.ignore_checksum || (source.checksum != checksum)
end
def import_attributes
{
name: import_name,
creator: creator,
file: imported_file,
options: import_workbench_options,
type: 'Import::Workbench',
import_category: import_category
}
end
def create_import
update import: workbench.imports.create!(import_attributes)
end
def self.delete_older(offset=20)
order(created_at: :desc).offset(offset).delete_all
end
def import_workbench_options
import_options
.merge(import_category_option)
.except(*processing_options.keys)
end
def processing_options
import_options.select{ |key, _| key.start_with?('process_') }
end
def import_category_option
if downloaded_file_type&.xml?
{ import_category: "netex_generic" }
else
{}
end
end
def import_category
"netex_generic" if downloaded_file_type&.xml?
end
private
def set_workbench
self.workbench = self.source&.workbench
end
end
module Processor
class Copy
def process(source_file, target_file)
IO.copy_stream(source_file, target_file)
end
end
class GTFS
def with_options(options = {})
@route_ids = options["process_gtfs_route_ids"]
@ignore_parents = options["process_gtfs_ignore_parents"]
self
end
def route_ids
@route_ids ||= []
end
def ignore_parents?
@ignore_parents
end
def empty?
route_ids.empty? && !ignore_parents?
end
def process(source_file, target_file)
route_ids = self.route_ids
ignore_parents = self.ignore_parents?
gtfs_target_for = Proc.new do |resource, associations|
ignored = false
if route_ids.present?
ignored = (route_ids & associations[:route_ids]).empty?
end
if ignore_parents && !ignored && resource.is_a?(::GTFS::Stop)
resource.parent_station = nil
ignored = true if resource.station?
end
if ignored
void_target
else
target(target_file)
end
end
::GTFS::Rewriter.new(source_file, target_for: gtfs_target_for).rewrite
end
end
end
end