lib/gooddata/models/user_filters/user_filter_builder.rb
# encoding: UTF-8
#
# Copyright (c) 2010-2017 GoodData Corporation. All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
require_relative '../project_log_formatter'
require 'active_support/core_ext/hash/indifferent_access'
require 'gooddata/extensions/true'
require 'gooddata/extensions/false'
require 'gooddata/extensions/integer'
using FalseExtensions
using TrueExtensions
using IntegerExtensions
using NilExtensions
module GoodData
module UserFilterBuilder
@all_domain_users = {}
@mutex = Mutex.new
# Main Entry function. Gets values and processes them to get filters
# that are suitable for other function to process.
# Values can be read from file or provided inline as an array.
# The results are then preprocessed. It is possible to provide
# multiple values for an attribute tries to deduplicate the values if
# they are not unique. Allows for setting over/to filters and allows for
# setting up filters from multiple columns. It is specially designed so many
# aspects of configuration are modifiable so you do have to preprocess the
# data as little as possible ideally you should be able to use data that
# came directly from the source system and that are intended for use in
# other parts of ETL.
#
# @param options [Hash]
# @return [Boolean]
def self.get_filters(file, options = {})
values = get_values(file, options)
reduce_results(values)
end
# Function that tells you if the file should be read line_wise. This happens
# if you have only one label defined and you do not have columns specified
#
# @param options [Hash]
# @return [Boolean]
def self.row_based?(options = {})
options[:labels].count == 1 && !options[:labels].first.key?(:column)
end
def self.read_file(file, options = {})
memo = Hash[]
if row_based?(options)
read_data_without_header(file, memo, options)
else
read_data_with_header(file, memo, options)
end
memo
end
def self.read_data_without_header(file, memo, options)
CSV.foreach(file, headers: false, return_headers: false) do |row|
key, data = process_line(row, options)
memo[key] = [] unless memo.key?(key)
memo[key].concat(data)
end
end
def self.read_data_with_header(file, memo, options)
CSV.foreach(file, headers: true, return_headers: false) do |row|
key, data = process_line(row, options)
memo[key] = [] unless memo.key?(key)
memo[key].concat(data)
end
memo
end
# Processes a line from source file. It is processed in
# 2 formats. First mode is column_based.
# It means getting all specific columns.
# These are specified either by index or name. Multiple
# values are provided by several rows for the same user
#
# Second mode is row based which means there are no headers
# and number of columns can be variable. Each row specifies multiple
# values for one user. It is implied that the file provides values
# for just one label
#
# @param options [Hash]
# @return
def self.process_line(line, options = {})
index = options[:user_column] || 0
login = line[index]
results = options[:labels].mapcat do |label|
column = label[:column] || Range.new(1, -1)
values = column.is_a?(Range) ? line.slice(column) : [line[column]]
[create_filter(label, values.compact)]
end
[login, results]
end
def self.create_filter(label, values)
{
:label => label[:label],
:values => values,
:over => label[:over],
:to => label[:to]
}
end
# Processes values in a map reduce way so the result is as readable as possible and
# poses minimal impact on the API
#
# @param options [Hash]
# @return [Array]
def self.reduce_results(data)
data.map { |k, v| { login: k, filters: UserFilterBuilder.collect_labels(v) } }
end
# Groups the values by particular label. And passes each group to deduplication
# @param options [Hash]
# @return
def self.collect_labels(data)
data.group_by { |x| [x[:label], x[:over], x[:to]] }.map { |l, v| { label: l[0], over: l[1], to: l[2], values: UserFilterBuilder.collect_values(v) } }
end
# Collects specific values and deduplicates if necessary
def self.collect_values(data)
data.mapcat do |e|
e[:values]
end.uniq
end
def self.create_cache(data, key)
data.reduce({}) do |a, e|
a[e.send(key)] = e
a
end
end
def self.get_missing_users(filters, options = {})
users_cache = options[:users_cache]
filters.reject { |u| users_cache.key?(u[:login]) }
end
def self.verify_existing_users(filters, options = {})
users_must_exist = options[:users_must_exist] == false ? false : true
users_cache = options[:users_cache]
domain = options[:domain]
if users_must_exist
missing_users = filters.reject do |u|
next true if users_cache.key?(u[:login])
domain_user = (domain && domain.find_user_by_login(u[:login]))
users_cache[domain_user.login] = domain_user if domain_user
next true if domain_user
false
end
unless missing_users.empty?
fail "#{missing_users.count} users are not part of the project and " \
"variable cannot be resolved since :users_must_exist is set " \
"to true (#{missing_users.join(', ')})"
end
end
end
def self.create_label_cache(result, options = {})
project = options[:project]
result.reduce({}) do |a, e|
e[:filters].map do |filter|
a[filter[:label]] = project.labels(filter[:label]) unless a.key?(filter[:label])
end
a
end
end
def self.create_lookups_cache(small_labels)
small_labels.reduce({}) do |a, e|
lookup = e.values(:limit => 1_000_000).reduce({}) do |a1, e1|
a1[e1[:value]] = e1[:uri]
a1
end
a[e.uri] = lookup
a
end
end
def self.create_attrs_cache(filters, options = {})
project = options[:project]
labels = filters.flat_map do |f|
f[:filters]
end
over_cache = labels.reduce({}) do |a, e|
a[e[:over]] = e[:over]
a
end
to_cache = labels.reduce({}) do |a, e|
a[e[:to]] = e[:to]
a
end
cache = over_cache.merge(to_cache)
attr_cache = {}
cache.each_pair do |k, v|
begin
attr_cache[k] = project.attributes(v)
rescue
nil
end
end
attr_cache
end
# Walks over provided labels and picks those that have fewer than certain amount of values
# This tries to balance for speed when working with small datasets (like users)
# so it precaches the values and still be able to function for larger ones even
# though that would mean tons of requests
def self.get_small_labels(labels_cache)
labels_cache.values.select { |label| label &.values_count &. < 100_000 }
end
# Creates a MAQL expression(s) based on the filter defintion.
# Takes the filter definition looks up any necessary values and provides API executable MAQL
# @param labels_cache e.g. { 'label_uri': label_object }
# @param lookups_cache e.g. { 'label_uri': { "jirka@gooddata.com": 'value_uri' }}
# rubocop:disable Metrics/ParameterLists
def self.create_expression(filter, labels_cache, lookups_cache, attr_cache, options = {}, login)
values = filter[:values]
# Do not create MUF for label when all its values is NULL (https://jira.intgdc.com/browse/TMA-1361)
non_null_values = values.select { |value| !value.nil? && value.downcase != 'null' }
return ['TRUE', []] if non_null_values.empty?
label = labels_cache[filter[:label]]
if label.nil?
err_message = "Unable to apply filter values: #{values} since the project: #{options[:project].pid} doesn't have label: #{filter[:label]} for login: #{login}"
if options[:ignore_missing_values]
GoodData.logger.warn(err_message)
return ['TRUE', []]
else
fail err_message
end
end
errors = []
element_uris_by_values = Hash[values.map do |v|
if lookups_cache.key?(label.uri)
[v, lookups_cache[label.uri][v]]
else
[v, label.find_value_uri(v)]
end
end]
missing_value_errors = element_uris_by_values.select { |_, v| v.nil? }.map do |k, _|
{
type: :error,
label: label.title,
value: k,
reason: 'Can not find the value of the attribute referenced in the MUF'
}
end
errors += missing_value_errors unless options[:ignore_missing_values]
element_uris = element_uris_by_values.values.compact
# happens when data is not yet loaded in the project
no_values = element_uris.empty?
expression = if no_values && options[:restrict_if_missing_all_values]
# create a filter that is always false to ensure the user can not see any data
# as the proper MUF can not be constructed yet
case options[:type]
when :muf
'1 <> 1'
when :variable
nil
end
elsif no_values
# create a filter that is always true to ensure the user can see all data
'TRUE'
elsif filter[:over] && filter[:to]
over = attr_cache[filter[:over]]
to = attr_cache[filter[:to]]
"([#{label.attribute_uri}] IN (#{element_uris.sort.map { |e| '[' + e + ']' }.join(', ')})) OVER [#{over && over.uri}] TO [#{to && to.uri}]"
else
"[#{label.attribute_uri}] IN (#{element_uris.sort.map { |e| '[' + e + ']' }.join(', ')})"
end
[expression, errors]
end
# rubocop:enable Metrics/ParameterLists
# Encapuslates the creation of filter
def self.create_user_filter(expression, related)
{
related: related,
level: :user,
expression: expression,
type: :filter
}
end
def self.create_user_profile_mapping(filters, project_users, options = {})
domain = options[:domain]
found_list = {}
missing_list = []
# Get the list of user login from filters
login_list = filters.flat_map do |filter|
filter[:login]
end
# Then find user login in the users_brick_input
users_brick_input = options[:users_brick_input]
if users_brick_input&.any?
users_brick_input.map do |user|
login_list << user.with_indifferent_access['login']
end
end
login_list.uniq.flat_map do |login|
user = project_users.find { |u| u.login == login }
if user
found_list[login] = user.profile_url
else
missing_list << login
end
end
# rubocop:disable Metrics/BlockNesting
unless missing_list.empty? || domain.nil?
if missing_list.size < 100
missing_list.each do |login|
user = domain.find_user_by_login(login)
found_list[login] = user.links['self'] if user
end
else
if @all_domain_users[domain.name].nil?
@mutex.lock
if @all_domain_users[domain.name].nil?
domain_users = domain.users
@all_domain_users[domain.name] = domain_users
GoodData.logger.info("action=lcm_get_domain_users domain=#{domain.name} number_users=#{domain_users.size} number_missing_users=#{missing_list.size} use_cache=false")
else
domain_users = @all_domain_users[domain.name]
GoodData.logger.info("action=lcm_get_domain_users domain=##{domain.name} number_users=#{domain_users.size} number_missing_users=#{missing_list.size} use_cache=true")
end
@mutex.unlock
else
domain_users = @all_domain_users[domain.name]
GoodData.logger.info("action=lcm_get_domain_users domain=##{domain.name} number_users=#{domain_users.size} number_missing_users=#{missing_list.size} use_cache=true")
end
missing_list.each do |login|
user = domain_users.find { |u| u.login == login }
found_list[login] = user.links['self'] if user
end
end
end
# rubocop:enable Metrics/BlockNesting
found_list
end
# Resolves and creates maql statements from filter definitions.
# This method does not perform any modifications on API but
# collects all the information that is needed to do so.
# Method collects all info from the user and current state in project and compares.
# Returns suggestion of what should be deleted and what should be created
# If there is some discrepancies in the data (missing values, nonexistent users) it
# finishes and collects all the errors at once
#
# @param filters [Array<Hash>] Filters definition
# @return [Array] first is list of MAQL statements
def self.maqlify_filters(filters, user_profile_mapping, options = {})
fail_early = options[:fail_early] == false ? false : true
users_cache = options[:users_cache]
labels_cache = create_label_cache(filters, options)
small_labels = get_small_labels(labels_cache)
lookups_cache = create_lookups_cache(small_labels)
attrs_cache = create_attrs_cache(filters, options)
create_filter_proc = proc do |login, f|
expression, errors = create_expression(f, labels_cache, lookups_cache, attrs_cache, options, login)
safe_login = login.downcase
profiles_uri = if options[:type] == :muf
user_profile_mapping[safe_login].nil? ? ('/gdc/account/profile/' + safe_login) : user_profile_mapping[safe_login]
elsif options[:type] == :variable
(users_cache[login] && users_cache[login].uri)
else
fail 'Unsuported type in maqlify_filters.'
end
if profiles_uri && expression && expression != 'TRUE'
[create_user_filter(expression, profiles_uri)] + errors
else
[] + errors
end
end
# if fail early process until first error
results = if fail_early
x = filters.inject([true, []]) do |(enough, a), e|
login = e[:login]
if enough
y = e[:filters].pmapcat { |f| create_filter_proc.call(login, f) }
[!y.any? { |r| r[:type] == :error }, a.concat(y)]
else
[false, a]
end
end
x.last
else
filters.flat_map do |filter|
login = filter[:login]
filter[:filters].pmapcat { |f| create_filter_proc.call(login, f) }
end
end
results.group_by { |i| i[:type] }.values_at(:filter, :error).map { |i| i || [] }
end
def self.resolve_user_filter(user = [], project = [])
user ||= []
project ||= []
to_create = user - project
to_delete = project - user
{ :create => to_create, :delete => to_delete }
end
# Gets user defined filters and values from project regardless if they
# come from Mandatory Filters or Variable filters and tries to
# resolve what needs to be removed an what needs to be updated
def self.resolve_user_filters(user_filters, vals)
project_vals_lookup = vals.group_by(&:related_uri)
user_vals_lookup = user_filters.group_by(&:related_uri)
a = vals.map(&:related_uri)
b = user_filters.map(&:related_uri)
users_to_try = (a + b).uniq
results = users_to_try.map do |user|
resolve_user_filter(user_vals_lookup[user], project_vals_lookup[user])
end
to_create = results.map { |x| x[:create] }.flatten.group_by(&:related_uri)
to_delete = results.map { |x| x[:delete] }.flatten.group_by(&:related_uri)
[to_create, to_delete]
end
# Executes the update for variables. It resolves what is new and needed to update.
# @param filters [Array<Hash>] Filter Definitions
# @param filters [Variable] Variable instance to be updated
# @param options [Hash]
# @option options [Boolean] :dry_run If dry run is true. No changes to he proejct are made but list of changes is provided
# @return [Array] list of filters that needs to be created and deleted
def self.execute_variables(filters, var, options = {})
client = options[:client]
project = options[:project]
dry_run = options[:dry_run]
to_create, to_delete = execute(filters, var.user_values, VariableUserFilter, options.merge(type: :variable))
return [to_create, to_delete] if dry_run
# TODO: get values that are about to be deleted and created and update them.
# This will make sure there is no downitme in filter existence
unless options[:do_not_touch_filters_that_are_not_mentioned]
to_delete.each { |_, group| group.each(&:delete) }
end
data = to_create.values.flatten.map(&:to_hash).map { |var_val| var_val.merge(prompt: var.uri) }
data.each_slice(200) do |slice|
client.post("/gdc/md/#{project.obj_id}/variables/user", :variables => slice)
end
[to_create, to_delete]
end
def self.execute_mufs(user_filters, options = {})
client = options[:client]
project = options[:project]
ignore_missing_values = options[:ignore_missing_values]
users_must_exist = options[:users_must_exist] == false ? false : true
dry_run = options[:dry_run]
project_log_formatter = GoodData::ProjectLogFormatter.new(project)
project_users = project.users
filters = normalize_filters(user_filters)
user_profile_mapping = create_user_profile_mapping(filters, project_users, options)
user_filters, errors = maqlify_filters(filters, user_profile_mapping, options.merge(users_must_exist: users_must_exist, type: :muf))
if !ignore_missing_values && !errors.empty?
errors = errors.map do |e|
e.merge(pid: project.pid)
end
fail GoodData::FilterMaqlizationError, errors
end
filters = user_filters.map { |data| client.create(MandatoryUserFilter, data, project: project) }
to_create, to_delete = resolve_user_filters(filters, project.data_permissions)
to_delete = sanitize_filters_to_delete(to_delete, options[:users_brick_input], user_profile_mapping) unless options[:no_sanitize]
if options[:do_not_touch_filters_that_are_not_mentioned]
GoodData.logger.warn("Data permissions computed: #{to_create.count} to create")
else
GoodData.logger.warn("Data permissions computed: #{to_create.count} to create and #{to_delete.count} to delete")
end
if dry_run
GoodData.logger.warn('Option "dry_run" specified. No user filters will be altered!')
create_results = to_create.map { |x| { status: 'dry_run', user: x.first, type: 'create' } }
delete_results = to_delete.map { |x| { status: 'dry_run', user: x.first, type: 'delete' } }
return { created: {},
deleted: {},
results: create_results + delete_results }
end
if to_create.empty?
create_results = []
else
create_results = to_create.each_slice(100).flat_map do |batch|
batch.pmapcat do |related_uri, group|
group.each(&:save)
res = client.get("/gdc/md/#{project.pid}/userfilters?users=#{related_uri}")
items = res['userFilters']['items'].empty? ? [] : res['userFilters']['items'].first['userFilters']
payload = {
'userFilters' => {
'items' => [{
'user' => related_uri,
'userFilters' => items.concat(group.map(&:uri))
}]
}
}
res = client.post("/gdc/md/#{project.pid}/userfilters", payload)
# turn the errors from hashes into array of hashes
update_result = res['userFiltersUpdateResult'].flat_map do |k, v|
v.map { |r| { status: k.to_sym, user: r, type: :create } }
end
update_result.map do |result|
result[:status] == :failed ? result.merge(GoodData::Helpers.symbolize_keys(result[:user])) : result
end
end
end
project_log_formatter.log_user_filter_results(create_results, to_create)
create_errors = create_results.select { |r| r[:status] == :failed }
fail "Creating MUFs resulted in errors: #{create_errors}" if create_errors.any?
end
if to_delete.empty?
delete_results = []
elsif !options[:do_not_touch_filters_that_are_not_mentioned]
delete_results = to_delete.each_slice(100).flat_map do |batch|
batch.flat_map do |related_uri, group|
results = []
if related_uri
res = client.get("/gdc/md/#{project.pid}/userfilters?users=#{related_uri}")
items = res['userFilters']['items'].empty? ? [] : res['userFilters']['items'].first['userFilters']
payload = {
'userFilters' => {
'items' => [
{
'user' => related_uri,
'userFilters' => items - group.map(&:uri)
}
]
}
}
res = client.post("/gdc/md/#{project.pid}/userfilters", payload)
results.concat(res['userFiltersUpdateResult']
.flat_map { |k, v| v.map { |r| { status: k.to_sym, user: r, type: :delete } } }
.map { |result| result[:status] == :failed ? result.merge(GoodData::Helpers.symbolize_keys(result[:user])) : result })
end
group.peach(&:delete)
results
end
project_log_formatter.log_user_filter_results(delete_results, to_delete)
delete_errors = delete_results.select { |r| r[:status] == :failed } if delete_results
fail "Deleting MUFs resulted in errors: #{delete_errors}" if delete_errors&.any?
end
end
{ created: to_create, deleted: to_delete, results: create_results + (delete_results || []) }
end
private
# Reads values from File/Array. Abstracts away the fact if it is column based,
# row based or in file or provided inline as an array
# @param file [String | Array] File or array of values to be parsed for filters
# @param options [Hash] Filter definitions
# @return [Array<Hash>]
def self.get_values(file, options = {})
file.is_a?(Array) ? read_array(file, options) : read_file(file, options)
end
# Reads array of values which are expected to be in a line wise manner
# [
# ['john.doe@example.com', 'Engineering', 'Marketing']
# ]
# @param data [Array<Array>]
def self.read_array(data, options = {})
memo = {}
data.each do |e|
key, data = process_line(e, options)
memo[key] = [] unless memo.key?(key)
memo[key].concat(data)
end
memo
end
# Executes the procedure necessary for loading user filters. This method has what
# is common for both implementations. Funcion
# * makes sure that filters are in normalized form.
# * verifies that users are in the project (and domain)
# * creates maql expressions of the filters provided
# * resolves the filters against current values in the project
# @param user_filters [Array] Filters that user is trying to set up
# @param project_filters [Array] List of filters currently in the project
# @param klass [Class] Class can be aither UserFilter or VariableFilter
# @param options [Hash] Filter definitions
# @return [Array<Hash>]
def self.execute(user_filters, project_filters, klass, options = {})
client = options[:client]
project = options[:project]
ignore_missing_values = options[:ignore_missing_values]
users_must_exist = options[:users_must_exist] == false ? false : true
filters = normalize_filters(user_filters)
# domain = options[:domain]
# users = domain ? project.users : project.users
users = project.users
users_cache = create_cache(users, :login)
missing_users = get_missing_users(filters, options.merge(users_cache: users_cache))
user_filters, errors = if missing_users.empty?
verify_existing_users(filters, project: project, users_must_exist: users_must_exist, users_cache: users_cache)
maqlify_filters(filters, users, options.merge(users_cache: users_cache, users_must_exist: users_must_exist))
elsif missing_users.count < 100
verify_existing_users(filters, project: project, users_must_exist: users_must_exist, users_cache: users_cache)
maqlify_filters(filters, users, options.merge(users_cache: users_cache, users_must_exist: users_must_exist))
else
users_cache = create_cache(users, :login)
verify_existing_users(filters, project: project, users_must_exist: users_must_exist, users_cache: users_cache)
maqlify_filters(filters, users, options.merge(users_cache: users_cache, users_must_exist: users_must_exist))
end
fail GoodData::FilterMaqlizationError, errors if !ignore_missing_values && !errors.empty?
filters = user_filters.map { |data| client.create(klass, data, project: project) }
resolve_user_filters(filters, project_filters)
end
# Gets definition of filters from user. They might either come in the full definition
# as hash or a simplified version. The simplified version do not cover all the possible
# features but it is much simpler to remember and suitable for quick hacking around
# @param filters [Array<Array | Hash>]
# @return [Array<Hash>]
def self.normalize_filters(filters)
filters.map do |filter|
if filter.is_a?(Hash)
filter
else
{
:login => filter.first,
:filters => [
{
:label => filter[1],
:values => filter[2..-1]
}
]
}
end
end
end
# Removes MUFs from to_delete unless in user is in users_brick_input
# if this does not happen, users that are about to be deleted by users_brick
# would have all their filters removed now, which is not desirable
def self.sanitize_filters_to_delete(to_delete, users_brick_input, user_profile_mapping)
return [] unless users_brick_input && users_brick_input.any?
user_profiles = users_brick_input.map do |user|
result = user_profile_mapping[user.with_indifferent_access['login']]
next unless result
result
end.compact
return [] unless user_profiles.any?
to_delete.reject do |_, value|
user_profiles.none? { |profile| profile == value.first.json[:related] }
end
end
end
end