af83/chouette-core

View on GitHub
app/models/source.rb

Summary

Maintainability
D
1 day
Test Coverage
require 'mimemagic_ext'
require 'net/ftp'

class Source < ApplicationModel
  extend Enumerize
  belongs_to :workbench, optional: false

  has_many :retrievals, class_name: "Source::Retrieval", foreign_key: "source_id", dependent: :destroy

  validates :name, presence: true
  validates :url, presence: true
  validates :downloader_type, presence: true

  with_options if: :downloader_type_direct? do |direct|
    direct.validates :url, url: { private_host: false, scheme: %w{http https} }
  end

  with_options if: :downloader_type_ftp? do |ftp|
    ftp.validates :url, url: { private_host: false, scheme: %w{ftp ftps} }
  end

  with_options if: :downloader_type_sftp? do |sftp|
    sftp.validates :url, url: { private_host: false, scheme: %w{sftp} }
  end

  with_options if: :downloader_type_authorization? do |authorization|
    authorization.validates :url, url: { private_host: false, scheme: %w{http https} }

    authorization.validates :downloader_option_raw_authorization, presence: true
    authorization.validates :downloader_option_username, :downloader_option_password, absence: true
  end

  with_options if: :downloader_type_french_nap? do |french_nap|
    french_nap.validates :url, url: { host: 'transport.data.gouv.fr', scheme: 'https' }
    french_nap.validates :downloader_option_raw_authorization, :downloader_option_username,
                            :downloader_option_password, absence: true
  end

  enumerize :downloader_type, in: %i(direct french_nap authorization ftp sftp), default: :direct, predicates: { prefix: true }

  scope :enabled, -> { where.not(retrieval_frequency: 'none') }

  before_validation :clean, on: :update

  attribute :retrieval_time_of_day, TimeOfDay::Type::TimeWithoutZone.new
  attribute :retrieval_days_of_week, Cuckoo::DaysOfWeek::Type.new

  belongs_to :scheduled_job, class_name: '::Delayed::Job', dependent: :destroy
  validates :retrieval_time_of_day, presence: true, if: :retrieval_frequency_daily?
  validates :retrieval_days_of_week, presence: true, if: :enabled?

  enumerize :retrieval_frequency, in: %w[none hourly daily], default: 'none', predicates: { prefix: true }

  def enabled?
    !retrieval_frequency_none?
  end

  def next_retrieval
    return if retrieval_frequency_none?

    scheduled_job&.run_at
  end

  # ?? Rails 5 ActiveRecord::AttributeAssignment .. doesn't create an object
  # by invoke writer with multiparameter attributes (like {1 => 13, 2 => 15})
  def retrieval_time_of_day=(time_of_day)
    if time_of_day.is_a?(Hash) && time_of_day.keys == [1,2]
      time_of_day = TimeOfDay.new(time_of_day[1], time_of_day[2])
    end
    super time_of_day
  end

  def reschedule
    scheduled_job&.destroy
    schedule
  end

  def schedule
    return unless enabled?

    job = ScheduledJob.new(self)
    scheduled_job = Delayed::Job.enqueue(job, cron: job.cron)

    # To avoid a new save sequence
    update_column :scheduled_job_id, scheduled_job.id
  end

  # Reschedule and save the Source. Uses this method to force a rescheduling, in migration for example.
  def reschedule!
    reschedule
    save
  end

  def reschedule_needed?
    retrieval_frequency_changed? || retrieval_time_of_day_changed? || retrieval_days_of_week_changed?
  end

  def retrieval_days_of_week_attributes=(attributes)
    self.retrieval_days_of_week = Cuckoo::DaysOfWeek.new(attributes)
  end

  # REMOVEME after CHOUETTE-2007
  before_validation ->(source) { source.retrieval_time_of_day ||= TimeOfDay.new(0, 0) }, if: :enabled?
  before_update :reschedule, if: :reschedule_needed?
  after_commit :schedule, on: :create, if: :enabled?

  # Uses to start the Source retrieval at the expected time
  class ScheduledJob < ::ScheduledJob
    def initialize(source)
      super()
      @source = source
      @source_id = source.id
    end
    attr_reader :source_id

    def encode_with(coder)
      coder['source_id'] = source_id
    end

    delegate :retrieval_time_of_day, :retrieval_frequency, :retrieval_days_of_week, to: :source

    def cron
      case retrieval_frequency
      when 'daily'
        if retrieval_time_of_day
          "#{retrieval_time_of_day.minute} #{retrieval_time_of_day.hour} * * #{retrieval_days_of_week_cron}"
        end
      when 'hourly'
        "#{hourly_random % 60} * * * #{retrieval_days_of_week_cron}"
      end
    end

    def retrieval_days_of_week_cron
      return '*' if retrieval_days_of_week.all?

      retrieval_days_of_week.days.map do |day_of_week|
        day_of_week.to_s.first(3)
      end.join(',')
    end

    def hourly_random
      source.id || Random.rand(60)
    end

    def source
      @source ||= Source.find_by(id: source_id)
    end

    def perform
      source.retrieve if source.enabled?
    end

    protected

    def perform_error_capture_message
      "Can't start Source##{source_id} retrieval"
    end
  end

  def clean
    unless downloader_type == "authorization"
      self.downloader_options = self.downloader_options.except("raw_authorization")
    end
  end

  def import_option_line_provider
    candidate_line_providers.find_by(id: import_option_line_provider_id) || workbench.default_line_provider
  end

  def import_option_stop_area_provider
    candidate_stop_area_providers.find_by(id: import_option_stop_area_provider_id) || workbench.default_stop_area_provider
  end

  def import_option_code_space
    workbench.workgroup.code_spaces.find_by(id: import_option_code_space_id)
  end

  def import_option_process_gtfs_route_ids
    import_options.fetch "process_gtfs_route_ids", []
  end

  # Use compact to delete empty value
  def import_option_process_gtfs_route_ids=(value)
    import_options["process_gtfs_route_ids"] = value.reject(&:blank?)
  end

  def method_missing(name, *args)
    case name
    when /^import_option_(.*)=/
      import_options[$1] = args.first
    when /^import_option_(.*)\?/
      import_options[$1] == 'true'
    when /^import_option_(.*)$/
      import_options[$1]
    when /^downloader_option_(.*)=/
      downloader_options[$1] = args.first
    when /^downloader_option_(.*)$/
      downloader_options[$1]
    else
      super
    end
  end

  def respond_to?(name)
    case name
    when /^import_option_/, /^downloader_option_/
      true
    else
      super
    end
  end

  def update_workgroup_providers?; import_option_update_workgroup_providers?; end
  def store_xml?; import_option_store_xml?; end
  def disable_missing_resources?; import_option_disable_missing_resources; end

  def downloader_class
    begin
      return Downloader.const_get(downloader_type.camelcase) if downloader_type.present?
    rescue NameError
      # Will use default class
    end

    Downloader::URL
  end

  def downloader
    downloader_class.new url, downloader_options
  end

  def retrieve
    retrieval = retrievals.create(creator: 'Source')
    retrieval.enqueue
    retrievals.delete_older
  end

  delegate :line_providers, :stop_area_providers, to: :workbench, prefix: :candidate

  module Downloader
    class URL
      mattr_accessor :timeout, default: 120.seconds

      attr_reader :url
      attr_accessor :username, :password, :error

      def redirection_count
        @redirection_count ||= 10
      end

      def initialize(url, options = {})
        @url = url
        options.each { |k,v| send "#{k}=", v }
      end

      def create_error(message_key)
        self.error = Source::Retrieval::Error.new(message_key)
      end

      def download(path, headers = {})
        self.error = nil

        client.on_success do |c|
          File.open(path, 'wb') do |file|
            file.write c.body
          end
        end

        begin
          client.perform
        rescue Curl::Err::CurlError
          create_error :download_failed
        end

        raise error if error
      end

      def headers
        {}
      end

      def client
        @client ||= Curl::Easy.new(url) do |client|
          headers.each { |name, value| client.headers[name] = value }

          client.timeout = timeout
          client.max_redirects = redirection_count
          client.follow_location = true

          if username && password
            client.username = username
            client.password = password
          end

          # Workaround for https://github.com/taf2/curb/issues/452
          client.cacert = ENV["CURL_CA_BUNDLE"]

          client.on_missing do |c, code|
            message_key = case c.response_code
            when 404
              :url_not_found
            when 401, 403
              :authentication_failed
            end

            create_error message_key if message_key
          end

          client.on_failure do |c, code|
            create_error :url_not_available
          end
        end
      end
    end

    class FrenchNap < URL
      def download(path)
        URL.new(link).download(path)
      rescue Curl::Err::CurlError
        raise Source::Retrieval::Error.new(:download_failed)
      end

      def page
        Nokogiri::HTML(
          Curl.get(url) do |client|
            client.timeout = timeout
            client.max_redirects = redirection_count
            client.follow_location = true
            client.cacert = ENV["CURL_CA_BUNDLE"]
          end.body
        )
      end

      def link
        # New layout : some download links are in a "div.resource-actions" class element, others just in a table
        # We prefer to use table links because we have absolute url and never relative url
        page.css('a.download-button').first["href"]
      end
    end

    class Authorization < URL
      attr_accessor :raw_authorization
      #validates_presence_of :raw_authorization

      def download(path)
        super path, headers
      end

      private

      def headers
        return {} unless raw_authorization

        { 'Authorization' => raw_authorization }
      end
    end
  end

  module Checksumer
    def self.for(type)
      type.zip? ? ZIP : File
    end

    class File
      include Measurable

      attr_reader :file
      def initialize(file)
        @file = file
      end

      def digest
        @digest ||= Digest::SHA256.new
      end

      def checksum
        checksum!
        digest.hexdigest
      end

      protected

      def digest_stream(io)
        buffer = ""
        while io.read 16384, buffer
          digest.update buffer
        end
      end

      def checksum!
        ::File.open(file) do |file|
          digest_stream file
        end
      end
    end

    class ZIP < File
      MAX_SIZE = 3.gigabytes

      # Digest entries name and content
      # Don't use a Zip::InputStream to avoid difference with entry order change
      def checksum!
        ::Zip::File.open(file) do |zip_file|
          # Sort the entries by name to always digest in the same order
          zip_file.glob('*').sort_by(&:name).each do |entry|
            # We could read only the beginning of larger files
            raise 'File too large when extracted' if entry.size > MAX_SIZE

            next unless entry.file?

            # Digest the entry name
            digest.update entry.name
            # Digest the entry content
            digest_stream entry.get_input_stream
          end
        end
      end
    end
  end

  class Retrieval < Operation
    include Measurable

    belongs_to :source, optional: false
    belongs_to :import, class_name: "Import::Workbench"
    belongs_to :workbench, optional: false

    before_validation :set_workbench, on: :create
    delegate :workgroup, to: :workbench

    attr_accessor :source_error

    def perform
      unless source
        logger.info "Source #{source_id} no longer available"
        return
      end

      download
      process
      # We could add an option to ignore the checksum / force the import
      if checksum_changed?
        logger.info "Checksum has changed"

        create_import
        source.update checksum: checksum
        self.message_key = :new_content_detected
      else
        self.message_key = :no_import_required
        logger.info "Checksum unchanged. Import is skipped"
      end
    rescue Source::Retrieval::Error => e
      logger.info "Retrieval failed with user message: #{e.message_key}"
      self.message_key = e.message_key
      self.source_error = e
    ensure
      save
    end

    def final_user_status
      return Operation.user_status.failed if source_error
      super
    end

    delegate :downloader, :import_options, to: :source

    def downloaded_file
      @downloaded_file ||= Tempfile.new(["retrieval-downloaded"])
    end

    def download
      logger.info "Download with #{downloader.class}"
      downloader.download downloaded_file
    end
    measure :download

    def user_message
      return '-' unless message_key
      I18n.translate message_key, scope: 'source/retrieval.user_message'
    end

    # To be replaced by import features (line selection, ignore parents, etc)
    def process
      return unless processor
      logger.info "Process downloaded file"
      processor.process downloaded_file, processed_file
    end
    measure :process

    def processor
      Processor::GTFS.new.with_options(processing_options).presence || Processor::Copy.new
    end

    class Error < StandardError
      attr_reader :message_key

      def initialize(message_key = nil)
        super(nil)

        @message_key = message_key
      end
    end

    def processed_file
      @processed_file ||= Tempfile.new(["retrieval-processed",".#{downloaded_file_type.default_extension}"])
    end

    def import_name
      "#{source.name} #{I18n.l(Time.zone.today)}"
    end

    def imported_file
      @processed_file || downloaded_file
    end

    def downloaded_file_type
      @file_type ||= MimeMagic.by_magic(downloaded_file)
    end

    def checksumer_class
      Checksumer.for(downloaded_file_type)
    end

    def checksumer
      checksumer_class.new imported_file
    end

    def checksum
      @checksum ||= checksumer.checksum
    end

    def checksum_changed?
      source.ignore_checksum || (source.checksum != checksum)
    end

    def import_attributes
      import_options = import_workbench_options
      {
        name: import_name,
        creator: creator,
        file: imported_file,
        options: import_options.except('code_space_id'),
        type: 'Import::Workbench',
        import_category: import_category,
        code_space_id: import_options['code_space_id']
      }
    end

    def create_import
      update import: workbench.imports.create!(import_attributes)
    end

    def self.delete_older(offset=20)
      order(created_at: :desc).offset(offset).delete_all
    end

    def import_workbench_options
      import_options
        .merge(import_category_option)
        .except(*processing_options.keys)
    end

    def processing_options
      import_options.select{ |key, _| key.start_with?('process_') }
    end

    def import_category_option
      if downloaded_file_type&.xml?
        { import_category: "netex_generic" }
      else
        {}
      end
    end

    def import_category
      "netex_generic" if downloaded_file_type&.xml?
    end

    private

    def set_workbench
      self.workbench = self.source&.workbench
    end
  end

  module Processor
    class Copy
      def process(source_file, target_file)
        IO.copy_stream(source_file, target_file)
      end
    end

    class GTFS

      def with_options(options = {})
        @route_ids = options["process_gtfs_route_ids"]
        @ignore_parents = options["process_gtfs_ignore_parents"]

        self
      end

      def route_ids
        @route_ids ||= []
      end

      def ignore_parents?
        @ignore_parents
      end

      def empty?
        route_ids.empty? && !ignore_parents?
      end

      def process(source_file, target_file)
        route_ids = self.route_ids
        ignore_parents = self.ignore_parents?

        gtfs_target_for = Proc.new do |resource, associations|
          ignored = false

          if route_ids.present?
            ignored = (route_ids & associations[:route_ids]).empty?
          end

          if ignore_parents && !ignored && resource.is_a?(::GTFS::Stop)
            resource.parent_station = nil
            ignored = true if resource.station?
          end

          if ignored
            void_target
          else
            target(target_file)
          end
        end

        ::GTFS::Rewriter.new(source_file, target_for: gtfs_target_for).rewrite
      end
    end
  end
end