af83/chouette-core

View on GitHub
app/models/aggregate.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true

class Aggregate < ApplicationModel
  DEFAULT_KEEP_AGGREGATES = 10

  include OperationSupport
  include NotifiableSupport

  include Measurable

  belongs_to :workgroup
  has_many :resources, class_name: 'Aggregate::Resource'
  has_many :processings, as: :operation, dependent: :destroy

  validates :workgroup, presence: true

  delegate :output, to: :workgroup

  def parent
    workgroup
  end

  def rollback!
    raise 'You cannot rollback to the current version' if current?

    workgroup.output.update current: new
    following_aggregates.each(&:cancel!)
    publish rollback: true
    workgroup.aggregated!
  end

  def cancel!
    update status: :canceled
    new&.rollbacked!
  end

  def following_aggregates
    following_referentials = workgroup.output.referentials.where('created_at > ?', new.created_at)
    workgroup.aggregates.where(new_id: following_referentials.pluck(:id))
  end

  def aggregate
    update_column :started_at, Time.zone.now
    update_column :status, :running

    enqueue_job :aggregate!
  end
  alias run aggregate

  # Returns aggregated referentials order by the Workbench priority
  # lower priority first, so higher value first
  def referentials_by_priority
    workgroup.referentials.joins(:workbench).merge(Workbench.order(priority: :desc)).where(id: referentials.map(&:id))
  end

  # Copy the referential provided by a Workbench
  # Clean the pending aggregated dataset (new) if needed
  class WorkbenchCopy
    include Measurable

    def initialize(referential, new)
      @referential = referential
      @new = new
    end
    attr_reader :referential, :new

    delegate :workbench, to: :referential
    delegate :priority, to: :workbench

    def overlaps
      ReferentialOverlaps.new referential, new, priority: priority
    end

    def overlapping_periods
      @overlapping_periods ||= overlaps.overlapping_periods
    end

    def aggregate_resource
      @aggregate_resource ||= Aggregate::Resource.new(
        priority: priority,
        workbench_name: workbench.name,
        referential_created_at: referential.created_at,
        metrics: {
          vehicle_journey_count: referential.switch { |ref| ref.vehicle_journeys.count },
          overlapping_period_count: overlapping_periods.count
        }
      )
    end

    def clean!
      overlapping_periods.each do |overlapping_period|
        Rails.logger.info "Clean Line #{overlapping_period.line_id} on #{overlapping_period.period}"

        clean_scope = Clean::Scope::Line.new(Clean::Scope::Referential.new(new), overlapping_period.line_id)
        clean_period = overlapping_period.period
        [
          Clean::InPeriod.new(clean_scope, clean_period),
          Clean::Metadata::InPeriod.new(clean_scope, clean_period)
        ].each(&:clean!)
      end
    end
    measure :clean!

    def copy!
      duration = ::Benchmark.realtime do
        measure :workbench, workbench_id: workbench.id do
          Rails.logger.tagged("Workbench ##{workbench.id}") do
            Rails.logger.info "Aggregate Referential##{referential.id} with priority #{priority}"
            clean!
            Referential::Copy.new(source: referential, target: new, source_priority: priority).copy
          end
        end
      end

      aggregate_resource.duration = duration.round

      self
    end
  end

  def aggregate!
    Rails.logger.tagged("Aggregate ##{id}") do
      measure 'aggregate', aggregate: id do
        prepare_new

        measure 'referential_copies' do
          referentials_by_priority.each do |source|
            copy = WorkbenchCopy.new(source, new).copy!
            resources << copy.aggregate_resource
          end
        end

        new.switch do
          measure 'analyse_current' do
            new.schema.analyse
          end

          new.update_counters
          ServiceCount.compute_for_referential(new)
        end

        if processing_rules_after_aggregate.present?
          continue_after_processings = processor.after([new])
          # Check processed status and stop aggregate if one failed
          unless continue_after_processings
            failed_on_processings
            return
          end
        end

        save_current
      end
    end
  rescue StandardError => e
    Chouette::Safe.capture "Aggregate ##{id} failed", e
    failed!
    raise e if Rails.env.test?
  end

  def processor
    @processor ||= Processor.new(self)
  end

  def processing_rules_after_aggregate
    workgroup.processing_rules.where(operation_step: 'after_aggregate')
  end

  def workbench_for_notifications
    workgroup.owner_workbench
  end

  def self.keep_operations
    @keep_operations ||= if Rails.configuration.respond_to?(:keep_aggregates)
                           Rails.configuration.keep_aggregates
                         else
                           DEFAULT_KEEP_AGGREGATES
                         end
  end

  def after_save_current
    clean_previous_operations
    publish
    workgroup.aggregated!
  end

  def handle_queue
    concurent_operations.pending.where('created_at < ?', created_at).find_each(&:cancel!)
    super
  end

  class Resource < ApplicationModel
    self.table_name = 'aggregate_resources'

    belongs_to :aggregate
    acts_as_list scope: :aggregate
  end

  private

  def prepare_new
    Rails.logger.debug 'Create a new output'
    # In the unique case, the referential created can't be linked to any workbench
    attributes = {
      organisation: workgroup.owner,
      prefix: "aggregate_#{id}",
      line_referential: workgroup.line_referential,
      stop_area_referential: workgroup.stop_area_referential,
      objectid_format: referentials.first.objectid_format,
      workbench: nil
    }
    new = workgroup.output.referentials.new attributes
    new.referential_suite = output
    new.name = I18n.t('aggregates.referential_name', date: I18n.l(created_at, format: :short_with_time))

    Rails.logger.error "New referential isn't valid : #{new.errors.inspect}" unless new.valid?

    begin
      new.save!
    rescue StandardError
      Rails.logger.debug "Errors on new referential: #{new.errors.messages}"
      raise
    end

    new.pending!

    output.update new: new
    update new: new
  end
  measure :prepare_new
end