openjaf/cenit

View on GitHub
app/models/setup/flow.rb

Summary

Maintainability
F
1 wk
Test Coverage
require 'nokogiri'

module Setup
  class Flow
    # = Flow
    #
    # Defines how data is processed by the execution of one or more actions.

    include ReqRejValidator
    include ShareWithBindings
    include NamespaceNamed
    include TriggersFormatter
    include ThreadAware
    include ModelConfigurable

    build_in_data_type.referenced_by(:namespace, :name)
    build_in_data_type.and(
      properties: {
        active: {
          type: 'boolean',
          default: true
        },
        notify_request: {
          type: 'boolean',
          default: false
        },
        notify_response: {
          type: 'boolean',
          default: false
        },
        discard_events: {
          type: 'boolean'
        },
        auto_retry: {
          type: 'string',
          enum: Setup::Task.auto_retry_enum.collect(&:to_s)
        }
      }
    )

    field :description, type: String

    binding_belongs_to :event, class_name: Setup::Event.to_s, inverse_of: nil

    belongs_to :translator, class_name: Setup::Translator.to_s, inverse_of: nil
    belongs_to :custom_data_type, class_name: Setup::DataType.to_s, inverse_of: nil
    field :nil_data_type, type: Mongoid::Boolean
    field :data_type_scope, type: String
    field :scope_filter, type: String
    belongs_to :scope_evaluator, class_name: Setup::Algorithm.to_s, inverse_of: nil
    field :lot_size, type: Integer

    belongs_to :webhook, class_name: Setup::Webhook.to_s, inverse_of: nil
    binding_belongs_to :authorization, class_name: Setup::Authorization.to_s, inverse_of: nil
    binding_belongs_to :connection_role, class_name: Setup::ConnectionRole.to_s, inverse_of: nil
    belongs_to :before_submit, class_name: Setup::Algorithm.to_s, inverse_of: nil

    belongs_to :response_translator, class_name: Setup::Translator.to_s, inverse_of: nil
    belongs_to :response_data_type, class_name: Setup::DataType.to_s, inverse_of: nil

    has_and_belongs_to_many :after_process_callbacks, class_name: Setup::Algorithm.to_s, inverse_of: nil

    field :data_type_id, type: BSON::ObjectId

    validates_numericality_in_presence_of :lot_size, greater_than_or_equal_to: 1

    config_with Setup::FlowConfig

    before_save :validates_configuration, :check_scheduler, :set_data_type_id

    after_save :schedule_task

    def validates_configuration
      format_triggers_on(:scope_filter) if scope_filter.present?
      unless requires(:name, :translator)
        if event.present?
          unless translator.type == :Import || requires(:data_type_scope)
            if scope_symbol == :event_source &&
               !(event.is_a?(Setup::Observer) && event.data_type == data_type)
              errors.add(:event, 'not compatible with data type scope')
            end
          end
        elsif scope_symbol == :event_source
          if persisted?
            requires(:event)
          else
            rejects(:data_type_scope)
          end
        end
        if translator.data_type.nil?
          requires(:custom_data_type) if translator.type == :Conversion && event.present?
        else
          rejects(:custom_data_type)
        end
        if translator.type == :Import
          rejects(:data_type_scope, :scope_filter, :scope_evaluator)
        else
          case scope_symbol
          when :filtered
            format_triggers_on(:scope_filter, true)
            rejects(:scope_evaluator)
          when :evaluation
            unless requires(:scope_evaluator)
              if scope_evaluator.parameters.size == 1
                unless scope_evaluator.parameters.first.name == 'scope'
                  errors.add(:scope_evaluator, "argument name should be 'scope'")
                end
              else
                errors.add(:scope_evaluator, 'must receive one parameter')
              end
            end
            rejects(:scope_filter)
          else
            rejects(:scope_filter, :scope_evaluator)
          end
        end
        if [:Import, :Export].include?(translator.type)
          requires(:webhook)
          if translator.type == :Import
            unless before_submit.nil? || before_submit.parameters.size == 1 || before_submit.parameters.size == 2
              errors.add(:before_submit, 'must receive one or two parameter')
            end
          else
            rejects(:before_submit)
          end
        else
          rejects(:before_submit, :connection_role, :authorization, :webhook, :notify_request, :notify_response)
        end

        if translator.type == :Export
          if response_translator.present?
            if response_translator.type == :Import
              if response_translator.data_type
                rejects(:response_data_type)
              else
                requires(:response_data_type)
              end
            else
              errors.add(:response_translator, 'is not an import translator')
            end
          else
            rejects(:response_data_type, :discard_events)
          end
          rejects(:data_type_scope) if data_type.nil?
        else
          rejects(:lot_size, :response_translator, :response_data_type)
        end
      end
      if (bad_callbacks = after_process_callbacks.select { |c| c.parameters.size != 1 }).present?
        errors.add(:after_process_callbacks, "contains algorithms with unexpected parameter size: #{bad_callbacks.collect(&:custom_title).to_sentence}")
      end
      abort_if_has_errors
    end

    def reject_message(field = nil)
      case field
      when :custom_data_type
        'is not allowed since translator already defines a data type'
      when :data_type_scope
        'is not allowed for import translators'
      when :response_data_type
        response_translator.present? ? 'is not allowed since response translator already defines a data type' : "can't be defined until response translator"
      when :discard_events
        "can't be defined until response translator"
      when :lot_size, :response_translator
        'is not allowed for non export translators'
      else
        super
      end
    end

    def with(options)
      if options && (data_type = options.delete(:data_type))
        using_data_type(data_type)
      end
      super
    end

    def own_data_type
      translator&.data_type || custom_data_type
    end

    def using_data_type(data_type)
      if (own_dt = own_data_type) && own_dt != data_type
        fail "Illegal data type option #{data_type.custom_title}, a flow own data type #{flow_data_type} is already configured"
      else
        @_data_type = data_type if data_type
      end
    end

    def data_type
      @_data_type || own_data_type
    end

    def set_data_type_id
      self.data_type_id = own_data_type&.id;
    end

    def data_type_scope_enum
      enum = []
      if data_type
        enum << 'Event source' if event && event.try(:data_type) == data_type
        enum << "All #{data_type.title.downcase.to_plural}"
        enum << 'Filter'
        enum << 'Evaluator'
      else
        enum << nil
      end
      enum
    end

    def auto_retry_enum
      Setup::Task.auto_retry_enum
    end

    def ready_to_save?
      shared? ||
        ((t = translator).present? &&
          (event.blank? || data_type_scope.present? || t.type == :Import) &&
          ([:Export, :Import].exclude?(t.type) || webhook.present?))
    end

    def can_be_restarted?
      event || translator
    end

    def join_process(message = {}, join_task = Task.current)
      if join_task
        process(message) do |task|
          task.join(join_task)
        end
      else
        process(message)
      end
    end

    def process(message = {}, &block)
      execution_graph = current_thread_cache.last || {}
      if (trigger_flow_id = execution_graph['trigger_flow_id'])
        execution_graph[trigger_flow_id] ||= []
        adjacency_list = execution_graph[trigger_flow_id]
        adjacency_list << id.to_s if adjacency_list.exclude?(id.to_s)
      end
      message = message.merge(flow_id: id.to_s,
                              execution_graph: execution_graph,
                              auto_retry: auto_retry)
      if (data_type = message.delete(:data_type)).is_a?(Setup::DataType)
        message[:data_type_id] = data_type.capataz_slave.id.to_s # TODO Remove capataz_slave when fixing capataz rewriter for Hash call arguments
      end
      if (authorization = message.delete(:authorization)).is_a?(Setup::Authorization)
        message[:authorization_id] = authorization.capataz_slave.id.to_s # TODO Remove capataz_slave when fixing capataz rewriter for Hash call arguments
      end
      if (connection = message.delete(:connection)).is_a?(Setup::Connection)
        message[:connection_id] = connection.capataz_slave.id.to_s # TODO Remove capataz_slave when fixing capataz rewriter for Hash call arguments
      end
      result = Setup::FlowExecution.process(message, &block)
      save
      result
    end

    def translate(message, &block)
      if translator.present?
        begin
          flow_execution = current_thread_cache
          flow_execution << (message[:execution_graph] || {}).merge('trigger_flow_id' => id.to_s)
          data_type = Setup::BuildInDataType[message[:data_type_id]] ||
                      Setup::DataType.where(id: message[:data_type_id]).first
          using_data_type(data_type) if data_type
          send("translate_#{translator.type.to_s.downcase}", message, &block)
          after_process_callbacks.each do |callback|
            begin
              callback.run(message[:task])
            rescue Exception => ex
              Setup::SystemNotification.create(message: "Error executing after process callback #{callback.custom_title}: #{ex.message}")
            end
          end
        ensure
          flow_execution.pop
        end
      else
        yield(message: "Flow translator can't be blank")
      end
    end

    def scope_symbol
      if data_type_scope.present?
        if data_type_scope.start_with?('Event')
          :event_source
        elsif data_type_scope.start_with?('Filter')
          :filtered
        elsif data_type_scope.start_with?('Eval')
          :evaluation
        else
          :all
        end
      else
        nil
      end
    end

    def selector_from(message)
      if (selector = message[:selector])
        if selector.is_a?(Hash) || selector.is_a?(Array)
          selector
        else
          JSON.parse(selector)
        end
      else
        nil
      end
    rescue
      nil
    end

    def sources(message)
      if (selector = selector_from(message))
        data_type.where(selector)
      elsif (ids = source_ids_from(message))
        data_type.where(:_id.in => ids)
      else
        data_type.all
      end
    end

    class << self
      def default_thread_value
        []
      end
    end

    private

    def check_scheduler
      if @scheduler_checked.nil?
        @scheduler_checked = changed_attributes.has_key?(:event_id.to_s) && event.is_a?(Setup::Scheduler)
      else
        @scheduler_checked = false
      end
    end

    def schedule_task
      process(scheduler: event) if @scheduler_checked && event.activated
    end

    def simple_translate(message, &block)
      unless (options = message[:options]).is_a?(Hash)
        options = {}
      end
      task = message[:task]
      if translator.try(:source_handler)
        translator_options = {
          discard_events: discard_events,
          task: task,
          data_type: data_type,
          options: options
        }
        if (selector = selector_from(message))
          translator_options[:selector] = selector
        else
          translator_options[:object_ids] = source_ids_from(message)
        end
        begin
          translator.run(translator_options)
        rescue Exception => ex
          msg = "Error source handling translation of records of type '#{data_type.custom_title}' with '#{translator.custom_title}': #{ex.message}"
          if task
            task.notify message: msg
          else
            fail msg
          end
        end
      else
        sources(message).each do |obj|
          begin
            translator.run(object: obj, discard_events: discard_events, task: message[:task], data_type: data_type, options: options)
          rescue Exception => ex
            msg = "Error translating record with ID '#{obj.id}' of type '#{data_type.custom_title}' when executing '#{translator.custom_title}': #{ex.message}"
            if task
              task.notify(message: msg)
              task.notify(ex)
            else
              fail msg
            end
          end
        end
      end
    rescue Exception => ex
      block.yield(ex) if block
    end

    def translate_conversion(message, &block)
      simple_translate(message, &block)
    end

    def translate_update(message, &block)
      simple_translate(message, &block)
    end

    def translate_import(message, &block)
      options =
        {
          headers: message['headers'] || {},
          parameters: message['parameters'] || {},
          template_parameters: message['template_parameters'] || {},
          notify_request: notify_request,
          notify_response: notify_response,
          verbose_response: true,
          data_type: data_type
        }
      if before_submit
        if before_submit.parameters.count == 1
          before_submit.run(options)
        elsif before_submit.parameters.count == 2
          before_submit.run([options, message[:task]])
        end
      end
      connection = options[:connection] || (
      (connection_id = options[:connection_id] || message[:connection_id]) && Setup::Connection.where(id: connection_id).first
      ) || self.connection_role
      authorization = options[:authorization] || (
      (authorization_id = options[:authorization_id] || message[:authorization_id]) && Setup::Authorization.where(id: authorization_id).first
      ) || self.authorization
      verbose_response =
        webhook.with(connection).and(authorization).submit(options) do |response, template_parameters|
          translator.run(target_data_type: data_type,
                         data: response.body,
                         discard_events: discard_events,
                         parameters: template_parameters,
                         headers: response.headers.to_hash,
                         statusCode: response.code,
                         response_code: response.code,
                         task: message[:task])
        end
      if auto_retry == :automatic
        if (response = verbose_response[:response])
          unless response.success?
            fail unsuccessful_response(response, message)
          end
        else
          fail 'Connection error'
        end
      end
    end

    def translate_export(message, &block)
      limit = translator.try(:bulk_source) ? lot_size || 1000 : 1
      source_scope = sources(message)
      max = source_scope.count - 1
      translation_options = nil
      connections_present = true
      records_processed = false
      connection = (
      (connection_id = message[:connection_id]) && Setup::Connection.where(id: connection_id).first
      ) || self.connection_role
      authorization = (
      (authorization_id = message[:authorization_id]) && Setup::Authorization.where(id: authorization_id).first
      ) || self.authorization
      0.step(max, limit) do |offset|
        records_processed = true
        next unless connections_present
        verbose_response =
          webhook.with(connection).and(authorization).submit(
            ->(template_parameters) {
              translation_options =
                {
                  source_data_type: data_type,
                  offset: offset,
                  limit: limit,
                  discard_events: discard_events,
                  template_parameters: template_parameters,
                  parameters: template_parameters,
                  task: message[:task]
                }
              if (selector = selector_from(message))
                translation_options[:selector] = selector
              else
                translation_options[:object_ids] = source_ids_from(message)
              end
              if (options = message[:template_options]).is_a?(Hash)
                translation_options[:options] = options
              end
              translator.run(translation_options)
            },
            contentType: translator.mime_type,
            notify_request: notify_request,
            request_attachment: ->(attachment) do
              attachment[:filename] = (data_type&.title || translator.name).collectionize +
                attachment[:filename] +
                ((ext = translator.file_extension).present? ? ".#{ext}" : '')
              attachment
            end,
            notify_response: notify_response,
            verbose_response: true,
            headers: message['headers'] || {},
            parameters: message['parameters'] || {},
            template_parameters: message['template_parameters'] || {}
          ) do |response|
            if response_translator #&& response.code == 200
              response_translator.run(translation_options.merge(
                target_data_type: response_translator.data_type || response_data_type,
                data: response.body,
                headers: response.headers.to_hash,
                statusCode: response.code, #TODO Remove after deprecation migration
                response_code: response.code,
                requester_response: response.requester_response?)
              )
            end
            true
          end
        if auto_retry == :automatic
          if (response = verbose_response[:response])
            unless response.success?
              fail unsuccessful_response(response, message)
            end
          else
            fail 'Connection error'
          end
        end
        connections_present = verbose_response[:connections_present]
      end
      Setup::SystemNotification.create(type: :warning, message: "No connections processed") unless connections_present
      Setup::SystemNotification.create(type: :warning, message: "No records processed") unless records_processed
    end

    def unsuccessful_response(http_response, task_msg)
      {
        error: 'Unsuccessful response code',
        code: http_response.code,
        user: ::User.current.label,
        user_id: ::User.current.id,
        tenant: Account.current.label,
        tenant_id: Account.current.id,
        task: task_msg,
        flow: to_hash,
        flow_attributes: attributes
      }.to_json
    end

    def attachment_from(http_response)
      file_extension = ((types = ::MIME::Types[http_response.content_type]).present? &&
        (ext = types.first.extensions.first).present? && '.' + ext) || ''
      {
        filename: http_response.object_id.to_s + file_extension,
        contentType: http_response.content_type,
        body: http_response.body
      } if notify_response && http_response
    end

    def source_ids_from(message)
      if (object_ids = message[:object_ids])
        object_ids
      elsif (id = message[:source_id])
        [id]
      elsif scope_symbol == :filtered
        data_type.records_model.all.select { |record| field_triggers_apply_to?(:scope_filter, record) }.collect(&:id)
      elsif scope_symbol == :evaluation
        unless (parameters_size = scope_evaluator.parameters.size) == 1
          fail "Illegal arguments size for scope evaluator: #{parameters_size} (1 expected)"
        end
        if scope_evaluator.parameters.first.name == 'scope'
          evaluation = scope_evaluator.run(data_type.all)
          if evaluation.is_a?(Mongoid::Criteria) || evaluation.is_a?(Mongoff::Criteria)
            if evaluation.count == data_type.count
              nil
            else
              evaluation.distinct(:_id).flatten
            end
          elsif ((model = data_type.records_model).is_a?(Class) || evaluation.is_a?(Mongoff::Record)) &&
                evaluation.is_a?(model)
            [evaluation.id]
          elsif evaluation.is_a?(Array)
            evaluation.collect(&:id)
          else
            fail "Illegal scope evaluator result of type #{evaluation.class}: #{evaluation}"
          end
        else
          data_type.records_model.all.select { |record| scope_evaluator.run(record).present? }.collect(&:id)
        end
      else
        nil
      end
    end

  end
end