gooddata/gooddata-ruby

View on GitHub
lib/gooddata/models/user_filters/user_filter_builder.rb

Summary

Maintainability
F
4 days
Test Coverage
# encoding: UTF-8
#
# Copyright (c) 2010-2017 GoodData Corporation. All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

require_relative '../project_log_formatter'

require 'active_support/core_ext/hash/indifferent_access'

require 'gooddata/extensions/true'
require 'gooddata/extensions/false'
require 'gooddata/extensions/integer'

using FalseExtensions
using TrueExtensions
using IntegerExtensions
using NilExtensions

module GoodData
  module UserFilterBuilder
    @all_domain_users = {}
    @mutex = Mutex.new

    # Main Entry function. Gets values and processes them to get filters
    # that are suitable for other function to process.
    # Values can be read from file or provided inline as an array.
    # The results are then preprocessed. It is possible to provide
    # multiple values for an attribute tries to deduplicate the values if
    # they are not unique. Allows for setting over/to filters and allows for
    # setting up filters from multiple columns. It is specially designed so many
    # aspects of configuration are modifiable so you do have to preprocess the
    # data as little as possible ideally you should be able to use data that
    # came directly from the source system and that are intended for use in
    # other parts of ETL.
    #
    # @param options [Hash]
    # @return [Boolean]
    def self.get_filters(file, options = {})
      values = get_values(file, options)
      reduce_results(values)
    end

    # Function that tells you if the file should be read line_wise. This happens
    # if you have only one label defined and you do not have columns specified
    #
    # @param options [Hash]
    # @return [Boolean]
    def self.row_based?(options = {})
      options[:labels].count == 1 && !options[:labels].first.key?(:column)
    end

    def self.read_file(file, options = {})
      memo = Hash[]
      if row_based?(options)
        read_data_without_header(file, memo, options)
      else
        read_data_with_header(file, memo, options)
      end
      memo
    end

    def self.read_data_without_header(file, memo, options)
      CSV.foreach(file, headers: false, return_headers: false) do |row|
        key, data = process_line(row, options)
        memo[key] = [] unless memo.key?(key)
        memo[key].concat(data)
      end
    end

    def self.read_data_with_header(file, memo, options)
      CSV.foreach(file, headers: true, return_headers: false) do |row|
        key, data = process_line(row, options)
        memo[key] = [] unless memo.key?(key)
        memo[key].concat(data)
      end
      memo
    end

    # Processes a line from source file. It is processed in
    # 2 formats. First mode is column_based.
    # It means getting all specific columns.
    # These are specified either by index or name. Multiple
    # values are provided by several rows for the same user
    #
    # Second mode is row based which means there are no headers
    # and number of columns can be variable. Each row specifies multiple
    # values for one user. It is implied that the file provides values
    # for just one label
    #
    # @param options [Hash]
    # @return
    def self.process_line(line, options = {})
      index = options[:user_column] || 0
      login = line[index]

      results = options[:labels].mapcat do |label|
        column = label[:column] || Range.new(1, -1)
        values = column.is_a?(Range) ? line.slice(column) : [line[column]]
        [create_filter(label, values.compact)]
      end
      [login, results]
    end

    def self.create_filter(label, values)
      {
        :label => label[:label],
        :values => values,
        :over => label[:over],
        :to => label[:to]
      }
    end

    # Processes values in a map reduce way so the result is as readable as possible and
    # poses minimal impact on the API
    #
    # @param options [Hash]
    # @return [Array]
    def self.reduce_results(data)
      data.map { |k, v| { login: k, filters: UserFilterBuilder.collect_labels(v) } }
    end

    # Groups the values by particular label. And passes each group to deduplication
    # @param options [Hash]
    # @return
    def self.collect_labels(data)
      data.group_by { |x| [x[:label], x[:over], x[:to]] }.map { |l, v| { label: l[0], over: l[1], to: l[2], values: UserFilterBuilder.collect_values(v) } }
    end

    # Collects specific values and deduplicates if necessary
    def self.collect_values(data)
      data.mapcat do |e|
        e[:values]
      end.uniq
    end

    def self.create_cache(data, key)
      data.reduce({}) do |a, e|
        a[e.send(key)] = e
        a
      end
    end

    def self.get_missing_users(filters, options = {})
      users_cache = options[:users_cache]
      filters.reject { |u| users_cache.key?(u[:login]) }
    end

    def self.verify_existing_users(filters, options = {})
      users_must_exist = options[:users_must_exist] == false ? false : true
      users_cache = options[:users_cache]
      domain = options[:domain]

      if users_must_exist
        missing_users = filters.reject do |u|
          next true if users_cache.key?(u[:login])
          domain_user = (domain && domain.find_user_by_login(u[:login]))
          users_cache[domain_user.login] = domain_user if domain_user
          next true if domain_user
          false
        end
        unless missing_users.empty?
          fail "#{missing_users.count} users are not part of the project and " \
               "variable cannot be resolved since :users_must_exist is set " \
               "to true (#{missing_users.join(', ')})"
        end
      end
    end

    def self.create_label_cache(result, options = {})
      project = options[:project]

      result.reduce({}) do |a, e|
        e[:filters].map do |filter|
          a[filter[:label]] = project.labels(filter[:label]) unless a.key?(filter[:label])
        end
        a
      end
    end

    def self.create_lookups_cache(small_labels)
      small_labels.reduce({}) do |a, e|
        lookup = e.values(:limit => 1_000_000).reduce({}) do |a1, e1|
          a1[e1[:value]] = e1[:uri]
          a1
        end
        a[e.uri] = lookup
        a
      end
    end

    def self.create_attrs_cache(filters, options = {})
      project = options[:project]

      labels = filters.flat_map do |f|
        f[:filters]
      end

      over_cache = labels.reduce({}) do |a, e|
        a[e[:over]] = e[:over]
        a
      end
      to_cache = labels.reduce({}) do |a, e|
        a[e[:to]] = e[:to]
        a
      end
      cache = over_cache.merge(to_cache)
      attr_cache = {}
      cache.each_pair do |k, v|
        begin
          attr_cache[k] = project.attributes(v)
        rescue
          nil
        end
      end
      attr_cache
    end

    # Walks over provided labels and picks those that have fewer than certain amount of values
    # This tries to balance for speed when working with small datasets (like users)
    # so it precaches the values and still be able to function for larger ones even
    # though that would mean tons of requests
    def self.get_small_labels(labels_cache)
      labels_cache.values.select { |label| label &.values_count &. < 100_000 }
    end

    # Creates a MAQL expression(s) based on the filter defintion.
    # Takes the filter definition looks up any necessary values and provides API executable MAQL
    # @param labels_cache e.g. { 'label_uri': label_object }
    # @param lookups_cache e.g. { 'label_uri': { "jirka@gooddata.com": 'value_uri' }}
    # rubocop:disable Metrics/ParameterLists
    def self.create_expression(filter, labels_cache, lookups_cache, attr_cache, options = {}, login)
      values = filter[:values]
      # Do not create MUF for label when all its values is NULL (https://jira.intgdc.com/browse/TMA-1361)
      non_null_values = values.select { |value| !value.nil? && value.downcase != 'null' }
      return ['TRUE', []] if non_null_values.empty?

      label = labels_cache[filter[:label]]
      if label.nil?
        err_message = "Unable to apply filter values: #{values} since the project: #{options[:project].pid} doesn't have label: #{filter[:label]} for login: #{login}"
        if options[:ignore_missing_values]
          GoodData.logger.warn(err_message)
          return ['TRUE', []]
        else
          fail err_message
        end
      end
      errors = []

      element_uris_by_values = Hash[values.map do |v|
        if lookups_cache.key?(label.uri)
          [v, lookups_cache[label.uri][v]]
        else
          [v, label.find_value_uri(v)]
        end
      end]

      missing_value_errors = element_uris_by_values.select { |_, v| v.nil? }.map do |k, _|
        {
          type: :error,
          label: label.title,
          value: k,
          reason: 'Can not find the value of the attribute referenced in the MUF'
        }
      end
      errors += missing_value_errors unless options[:ignore_missing_values]

      element_uris = element_uris_by_values.values.compact
      # happens when data is not yet loaded in the project
      no_values = element_uris.empty?

      expression = if no_values && options[:restrict_if_missing_all_values]
                     # create a filter that is always false to ensure the user can not see any data
                     # as the proper MUF can not be constructed yet
                     case options[:type]
                     when :muf
                       '1 <> 1'
                     when :variable
                       nil
                     end
                   elsif no_values
                     # create a filter that is always true to ensure the user can see all data
                     'TRUE'
                   elsif filter[:over] && filter[:to]
                     over = attr_cache[filter[:over]]
                     to = attr_cache[filter[:to]]
                     "([#{label.attribute_uri}] IN (#{element_uris.sort.map { |e| '[' + e + ']' }.join(', ')})) OVER [#{over && over.uri}] TO [#{to && to.uri}]"
                   else
                     "[#{label.attribute_uri}] IN (#{element_uris.sort.map { |e| '[' + e + ']' }.join(', ')})"
                   end
      [expression, errors]
    end
    # rubocop:enable Metrics/ParameterLists

    # Encapuslates the creation of filter
    def self.create_user_filter(expression, related)
      {
        related: related,
        level: :user,
        expression: expression,
        type: :filter
      }
    end

    def self.create_user_profile_mapping(filters, project_users, options = {})
      domain = options[:domain]
      found_list = {}
      missing_list = []

      # Get the list of user login from filters
      login_list = filters.flat_map do |filter|
        filter[:login]
      end

      # Then find user login in the users_brick_input
      users_brick_input = options[:users_brick_input]
      if users_brick_input&.any?
        users_brick_input.map do |user|
          login_list << user.with_indifferent_access['login']
        end
      end

      login_list.uniq.flat_map do |login|
        user = project_users.find { |u| u.login == login }
        if user
          found_list[login] = user.profile_url
        else
          missing_list << login
        end
      end
      # rubocop:disable Metrics/BlockNesting
      unless missing_list.empty? || domain.nil?
        if missing_list.size < 100
          missing_list.each do |login|
            user = domain.find_user_by_login(login)
            found_list[login] = user.links['self'] if user
          end
        else
          if @all_domain_users[domain.name].nil?
            @mutex.lock
            if @all_domain_users[domain.name].nil?
              domain_users = domain.users
              @all_domain_users[domain.name] = domain_users
              GoodData.logger.info("action=lcm_get_domain_users domain=#{domain.name} number_users=#{domain_users.size} number_missing_users=#{missing_list.size} use_cache=false")
            else
              domain_users = @all_domain_users[domain.name]
              GoodData.logger.info("action=lcm_get_domain_users domain=##{domain.name} number_users=#{domain_users.size} number_missing_users=#{missing_list.size} use_cache=true")
            end
            @mutex.unlock
          else
            domain_users = @all_domain_users[domain.name]
            GoodData.logger.info("action=lcm_get_domain_users domain=##{domain.name} number_users=#{domain_users.size} number_missing_users=#{missing_list.size} use_cache=true")
          end

          missing_list.each do |login|
            user = domain_users.find { |u| u.login == login }
            found_list[login] = user.links['self'] if user
          end
        end
      end
      # rubocop:enable Metrics/BlockNesting
      found_list
    end

    # Resolves and creates maql statements from filter definitions.
    # This method does not perform any modifications on API but
    # collects all the information that is needed to do so.
    # Method collects all info from the user and current state in project and compares.
    # Returns suggestion of what should be deleted and what should be created
    # If there is some discrepancies in the data (missing values, nonexistent users) it
    # finishes and collects all the errors at once
    #
    # @param filters [Array<Hash>] Filters definition
    # @return [Array] first is list of MAQL statements
    def self.maqlify_filters(filters, user_profile_mapping, options = {})
      fail_early = options[:fail_early] == false ? false : true
      users_cache = options[:users_cache]
      labels_cache = create_label_cache(filters, options)
      small_labels = get_small_labels(labels_cache)
      lookups_cache = create_lookups_cache(small_labels)
      attrs_cache = create_attrs_cache(filters, options)
      create_filter_proc = proc do |login, f|
        expression, errors = create_expression(f, labels_cache, lookups_cache, attrs_cache, options, login)
        safe_login = login.downcase
        profiles_uri = if options[:type] == :muf
                         user_profile_mapping[safe_login].nil? ? ('/gdc/account/profile/' + safe_login) : user_profile_mapping[safe_login]
                       elsif options[:type] == :variable
                         (users_cache[login] && users_cache[login].uri)
                       else
                         fail 'Unsuported type in maqlify_filters.'
                       end

        if profiles_uri && expression && expression != 'TRUE'
          [create_user_filter(expression, profiles_uri)] + errors
        else
          [] + errors
        end
      end

      # if fail early process until first error
      results = if fail_early
                  x = filters.inject([true, []]) do |(enough, a), e|
                    login = e[:login]
                    if enough
                      y = e[:filters].pmapcat { |f| create_filter_proc.call(login, f) }
                      [!y.any? { |r| r[:type] == :error }, a.concat(y)]
                    else
                      [false, a]
                    end
                  end
                  x.last
                else
                  filters.flat_map do |filter|
                    login = filter[:login]
                    filter[:filters].pmapcat { |f| create_filter_proc.call(login, f) }
                  end
                end
      results.group_by { |i| i[:type] }.values_at(:filter, :error).map { |i| i || [] }
    end

    def self.resolve_user_filter(user = [], project = [])
      user ||= []
      project ||= []
      to_create = user - project
      to_delete = project - user
      { :create => to_create, :delete => to_delete }
    end

    # Gets user defined filters and values from project regardless if they
    # come from Mandatory Filters or Variable filters and tries to
    # resolve what needs to be removed an what needs to be updated
    def self.resolve_user_filters(user_filters, vals)
      project_vals_lookup = vals.group_by(&:related_uri)
      user_vals_lookup = user_filters.group_by(&:related_uri)

      a = vals.map(&:related_uri)
      b = user_filters.map(&:related_uri)

      users_to_try = (a + b).uniq
      results = users_to_try.map do |user|
        resolve_user_filter(user_vals_lookup[user], project_vals_lookup[user])
      end

      to_create = results.map { |x| x[:create] }.flatten.group_by(&:related_uri)
      to_delete = results.map { |x| x[:delete] }.flatten.group_by(&:related_uri)
      [to_create, to_delete]
    end

    # Executes the update for variables. It resolves what is new and needed to update.
    # @param filters [Array<Hash>] Filter Definitions
    # @param filters [Variable] Variable instance to be updated
    # @param options [Hash]
    # @option options [Boolean] :dry_run If dry run is true. No changes to he proejct are made but list of changes is provided
    # @return [Array] list of filters that needs to be created and deleted
    def self.execute_variables(filters, var, options = {})
      client = options[:client]
      project = options[:project]
      dry_run = options[:dry_run]
      to_create, to_delete = execute(filters, var.user_values, VariableUserFilter, options.merge(type: :variable))
      return [to_create, to_delete] if dry_run

      # TODO: get values that are about to be deleted and created and update them.
      # This will make sure there is no downitme in filter existence
      unless options[:do_not_touch_filters_that_are_not_mentioned]
        to_delete.each { |_, group| group.each(&:delete) }
      end
      data = to_create.values.flatten.map(&:to_hash).map { |var_val| var_val.merge(prompt: var.uri) }
      data.each_slice(200) do |slice|
        client.post("/gdc/md/#{project.obj_id}/variables/user", :variables => slice)
      end
      [to_create, to_delete]
    end

    def self.execute_mufs(user_filters, options = {})
      client = options[:client]
      project = options[:project]
      ignore_missing_values = options[:ignore_missing_values]
      users_must_exist = options[:users_must_exist] == false ? false : true
      dry_run = options[:dry_run]
      project_log_formatter = GoodData::ProjectLogFormatter.new(project)

      project_users = project.users
      filters = normalize_filters(user_filters)
      user_profile_mapping = create_user_profile_mapping(filters, project_users, options)
      user_filters, errors = maqlify_filters(filters, user_profile_mapping, options.merge(users_must_exist: users_must_exist, type: :muf))
      if !ignore_missing_values && !errors.empty?
        errors = errors.map do |e|
          e.merge(pid: project.pid)
        end
        fail GoodData::FilterMaqlizationError, errors
      end

      filters = user_filters.map { |data| client.create(MandatoryUserFilter, data, project: project) }
      to_create, to_delete = resolve_user_filters(filters, project.data_permissions)

      to_delete = sanitize_filters_to_delete(to_delete, options[:users_brick_input], user_profile_mapping) unless options[:no_sanitize]

      if options[:do_not_touch_filters_that_are_not_mentioned]
        GoodData.logger.warn("Data permissions computed: #{to_create.count} to create")
      else
        GoodData.logger.warn("Data permissions computed: #{to_create.count} to create and #{to_delete.count} to delete")
      end

      if dry_run
        GoodData.logger.warn('Option "dry_run" specified. No user filters will be altered!')
        create_results = to_create.map { |x| { status: 'dry_run', user: x.first, type: 'create' } }
        delete_results = to_delete.map { |x| { status: 'dry_run', user: x.first, type: 'delete' } }
        return { created: {},
                 deleted: {},
                 results: create_results + delete_results }
      end

      if to_create.empty?
        create_results = []
      else
        create_results = to_create.each_slice(100).flat_map do |batch|
          batch.pmapcat do |related_uri, group|
            group.each(&:save)
            res = client.get("/gdc/md/#{project.pid}/userfilters?users=#{related_uri}")
            items = res['userFilters']['items'].empty? ? [] : res['userFilters']['items'].first['userFilters']

            payload = {
              'userFilters' => {
                'items' => [{
                  'user' => related_uri,
                  'userFilters' => items.concat(group.map(&:uri))
                }]
              }
            }
            res = client.post("/gdc/md/#{project.pid}/userfilters", payload)

            # turn the errors from hashes into array of hashes
            update_result = res['userFiltersUpdateResult'].flat_map do |k, v|
              v.map { |r| { status: k.to_sym, user: r, type: :create } }
            end

            update_result.map do |result|
              result[:status] == :failed ? result.merge(GoodData::Helpers.symbolize_keys(result[:user])) : result
            end
          end
        end
        project_log_formatter.log_user_filter_results(create_results, to_create)
        create_errors = create_results.select { |r| r[:status] == :failed }
        fail "Creating MUFs resulted in errors: #{create_errors}" if create_errors.any?
      end

      if to_delete.empty?
        delete_results = []
      elsif !options[:do_not_touch_filters_that_are_not_mentioned]
        delete_results = to_delete.each_slice(100).flat_map do |batch|
          batch.flat_map do |related_uri, group|
            results = []
            if related_uri
              res = client.get("/gdc/md/#{project.pid}/userfilters?users=#{related_uri}")
              items = res['userFilters']['items'].empty? ? [] : res['userFilters']['items'].first['userFilters']
              payload = {
                'userFilters' => {
                  'items' => [
                    {
                      'user' => related_uri,
                      'userFilters' => items - group.map(&:uri)
                    }
                  ]
                }
              }
              res = client.post("/gdc/md/#{project.pid}/userfilters", payload)
              results.concat(res['userFiltersUpdateResult']
                                 .flat_map { |k, v| v.map { |r| { status: k.to_sym, user: r, type: :delete } } }
                                 .map { |result| result[:status] == :failed ? result.merge(GoodData::Helpers.symbolize_keys(result[:user])) : result })
            end
            group.peach(&:delete)
            results
          end

          project_log_formatter.log_user_filter_results(delete_results, to_delete)
          delete_errors = delete_results.select { |r| r[:status] == :failed } if delete_results
          fail "Deleting MUFs resulted in errors: #{delete_errors}" if delete_errors&.any?
        end
      end

      { created: to_create, deleted: to_delete, results: create_results + (delete_results || []) }
    end

    private

    # Reads values from File/Array. Abstracts away the fact if it is column based,
    # row based or in file or provided inline as an array
    # @param file [String | Array] File or array of values to be parsed for filters
    # @param options [Hash] Filter definitions
    # @return [Array<Hash>]
    def self.get_values(file, options = {})
      file.is_a?(Array) ? read_array(file, options) : read_file(file, options)
    end

    # Reads array of values which are expected to be in a line wise manner
    # [
    #   ['john.doe@example.com', 'Engineering', 'Marketing']
    # ]
    # @param data [Array<Array>]
    def self.read_array(data, options = {})
      memo = {}
      data.each do |e|
        key, data = process_line(e, options)
        memo[key] = [] unless memo.key?(key)
        memo[key].concat(data)
      end
      memo
    end

    # Executes the procedure necessary for loading user filters. This method has what
    # is common for both implementations. Funcion
    #   * makes sure that filters are in normalized form.
    #   * verifies that users are in the project (and domain)
    #   * creates maql expressions of the filters provided
    #   * resolves the filters against current values in the project
    # @param user_filters [Array] Filters that user is trying to set up
    # @param project_filters [Array] List of filters currently in the project
    # @param klass [Class] Class can be aither UserFilter or VariableFilter
    # @param options [Hash] Filter definitions
    # @return [Array<Hash>]
    def self.execute(user_filters, project_filters, klass, options = {})
      client = options[:client]
      project = options[:project]

      ignore_missing_values = options[:ignore_missing_values]
      users_must_exist = options[:users_must_exist] == false ? false : true
      filters = normalize_filters(user_filters)
      # domain = options[:domain]
      # users = domain ? project.users : project.users
      users = project.users
      users_cache = create_cache(users, :login)
      missing_users = get_missing_users(filters, options.merge(users_cache: users_cache))
      user_filters, errors = if missing_users.empty?
                               verify_existing_users(filters, project: project, users_must_exist: users_must_exist, users_cache: users_cache)
                               maqlify_filters(filters, users, options.merge(users_cache: users_cache, users_must_exist: users_must_exist))
                             elsif missing_users.count < 100
                               verify_existing_users(filters, project: project, users_must_exist: users_must_exist, users_cache: users_cache)
                               maqlify_filters(filters, users, options.merge(users_cache: users_cache, users_must_exist: users_must_exist))
                             else
                               users_cache = create_cache(users, :login)
                               verify_existing_users(filters, project: project, users_must_exist: users_must_exist, users_cache: users_cache)
                               maqlify_filters(filters, users, options.merge(users_cache: users_cache, users_must_exist: users_must_exist))
                             end

      fail GoodData::FilterMaqlizationError, errors if !ignore_missing_values && !errors.empty?
      filters = user_filters.map { |data| client.create(klass, data, project: project) }
      resolve_user_filters(filters, project_filters)
    end

    # Gets definition of filters from user. They might either come in the full definition
    # as hash or a simplified version. The simplified version do not cover all the possible
    # features but it is much simpler to remember and suitable for quick hacking around
    # @param filters [Array<Array | Hash>]
    # @return [Array<Hash>]
    def self.normalize_filters(filters)
      filters.map do |filter|
        if filter.is_a?(Hash)
          filter
        else
          {
            :login => filter.first,
            :filters => [
              {
                :label => filter[1],
                :values => filter[2..-1]
              }
            ]
          }
        end
      end
    end

    # Removes MUFs from to_delete unless in user is in users_brick_input
    # if this does not happen, users that are about to be deleted by users_brick
    # would have all their filters removed now, which is not desirable
    def self.sanitize_filters_to_delete(to_delete, users_brick_input, user_profile_mapping)
      return [] unless users_brick_input && users_brick_input.any?
      user_profiles = users_brick_input.map do |user|
        result = user_profile_mapping[user.with_indifferent_access['login']]
        next unless result
        result
      end.compact
      return [] unless user_profiles.any?
      to_delete.reject do |_, value|
        user_profiles.none? { |profile| profile == value.first.json[:related] }
      end
    end
  end
end