lib/search_index_backend.rb
# Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
class SearchIndexBackend
SUPPORTED_ES_VERSION_MINIMUM = '7.8'.freeze
SUPPORTED_ES_VERSION_LESS_THAN = '9'.freeze
=begin
info about used search index machine
SearchIndexBackend.info
=end
def self.info
url = Setting.get('es_url').to_s
return if url.blank?
response = make_request(url)
if response.success?
installed_version = response.data.dig('version', 'number')
raise "Unable to get elasticsearch version from response: #{response.inspect}" if installed_version.blank?
installed_version_parsed = Gem::Version.new(installed_version)
if (installed_version_parsed >= Gem::Version.new(SUPPORTED_ES_VERSION_LESS_THAN)) ||
(installed_version_parsed < Gem::Version.new(SUPPORTED_ES_VERSION_MINIMUM))
raise "Version #{installed_version} of configured elasticsearch is not supported."
end
return response.data
end
raise humanized_error(
verb: 'GET',
url: url,
response: response,
)
end
=begin
update processors
SearchIndexBackend.processors(
_ingest/pipeline/attachment: {
description: 'Extract attachment information from arrays',
processors: [
{
foreach: {
field: 'ticket.articles.attachments',
processor: {
attachment: {
target_field: '_ingest._value.attachment',
field: '_ingest._value.data'
}
}
}
}
]
}
)
=end
def self.processors(data)
data.each do |key, items|
url = "#{Setting.get('es_url')}/#{key}"
items.each do |item|
if item[:action] == 'delete'
response = make_request(url, method: :delete)
next if response.success?
next if response.code.to_s == '404'
raise humanized_error(
verb: 'DELETE',
url: url,
response: response,
)
end
item.delete(:action)
make_request_and_validate(url, data: item, method: :put)
end
end
true
end
=begin
create/update/delete index
SearchIndexBackend.index(
:action => 'create', # create/update/delete
:name => 'Ticket',
:data => {
:mappings => {
:Ticket => {
:properties => {
:articles => {
:type => 'nested',
:properties => {
'attachment' => { :type => 'attachment' }
}
}
}
}
}
}
)
SearchIndexBackend.index(
:action => 'delete', # create/update/delete
:name => 'Ticket',
)
=end
def self.index(data)
url = build_url(type: data[:name], with_pipeline: false, with_document_type: false)
return if url.blank?
if data[:action] && data[:action] == 'delete'
return if !SearchIndexBackend.index_exists?(data[:name])
return SearchIndexBackend.remove(data[:name])
end
make_request_and_validate(url, data: data[:data], method: :put)
end
=begin
add new object to search index
SearchIndexBackend.add('Ticket', some_data_object)
=end
def self.add(type, data)
url = build_url(type: type, object_id: data['id'])
return if url.blank?
make_request_and_validate(url, data: data, method: :post)
end
=begin
get object of search index by id
SearchIndexBackend.get('Ticket', 123)
=end
def self.get(type, data)
url = build_url(type: type, object_id: data, with_pipeline: false)
return if url.blank?
make_request(url, method: :get).try(:data)
end
=begin
Check if an index exists.
SearchIndexBackend.index_exists?('Ticket')
=end
def self.index_exists?(type)
url = build_url(type: type, with_pipeline: false, with_document_type: false)
return if url.blank?
response = make_request(url)
return true if response.success?
return true if response.code.to_s != '404'
false
end
=begin
This function updates specifc attributes of an index based on a query.
It should get used in batches to prevent performance issues on entities which have millions of objects in it.
data = {
organization: {
name: "Zammad Foundation"
}
}
where = {
term: {
organization_id: 1
}
}
SearchIndexBackend.update_by_query('Ticket', data, where)
=end
def self.update_by_query(type, data, where)
return if data.blank?
return if where.blank?
url_params = {
conflicts: 'proceed',
slices: 'auto',
max_docs: 1_000,
}
url = build_url(type: type, action: '_update_by_query', with_pipeline: false, with_document_type: false, url_params: url_params)
return if url.blank?
script_list = []
data.each_key do |key|
script_list.push("ctx._source.#{key}=params.#{key}")
end
data = {
script: {
lang: 'painless',
source: script_list.join(';'),
params: data,
},
query: where,
sort: {
id: 'desc',
},
}
response = make_request(url, data: data, method: :post, read_timeout: 10.minutes)
if !response.success?
Rails.logger.error humanized_error(
verb: 'GET',
url: url,
payload: data,
response: response,
)
return []
end
response.data
end
=begin
remove whole data from index
SearchIndexBackend.remove('Ticket', 123)
SearchIndexBackend.remove('Ticket')
=end
def self.remove(type, o_id = nil)
url = if o_id
build_url(type: type, object_id: o_id, with_pipeline: false, with_document_type: true)
else
build_url(type: type, object_id: o_id, with_pipeline: false, with_document_type: false)
end
return if url.blank?
response = make_request(url, method: :delete)
return true if response.success?
return true if response.code.to_s == '400'
humanized_error = humanized_error(
verb: 'DELETE',
url: url,
response: response,
)
Rails.logger.warn "Can't delete index: #{humanized_error}"
false
end
=begin
@param query [String] search query
@param index [String, Array<String>] indexes to search in (see search_by_index)
@param options [Hash] search options (see build_query)
@return search result
@example Sample queries
result = SearchIndexBackend.search('search query', ['User', 'Organization'], limit: limit)
- result = SearchIndexBackend.search('search query', 'User', limit: limit)
result = SearchIndexBackend.search('search query', 'User', limit: limit, sort_by: ['updated_at'], order_by: ['desc'])
result = SearchIndexBackend.search('search query', 'User', limit: limit, sort_by: ['active', updated_at'], order_by: ['desc', 'desc'])
result = [
{
:id => 123,
:type => 'User',
},
{
:id => 125,
:type => 'User',
},
{
:id => 15,
:type => 'Organization',
}
]
=end
def self.search(query, index, options = {})
if !index.is_a? Array
return search_by_index(query, index, options)
end
index
.filter_map { |local_index| search_by_index(query, local_index, options) }
.flatten(1)
end
=begin
@param query [String] search query
@param index [String] index name
@param options [Hash] search options (see build_query)
@return search result
=end
def self.search_by_index(query, index, options = {})
return [] if query.blank?
url = build_url(type: index, action: '_search', with_pipeline: false, with_document_type: false)
return [] if url.blank?
# real search condition
condition = {
'query_string' => {
'query' => append_wildcard_to_simple_query(query),
'time_zone' => Setting.get('timezone_default'),
'default_operator' => 'AND',
'analyze_wildcard' => true,
}
}
if (fields = options.dig(:query_fields_by_indexes, index.to_sym))
condition['query_string']['fields'] = fields
end
query_data = build_query(index, condition, options)
if (fields = options.dig(:highlight_fields_by_indexes, index.to_sym))
fields_for_highlight = fields.index_with { |_elem| {} }
query_data[:highlight] = { fields: fields_for_highlight }
end
response = make_request(url, data: query_data, method: :post)
if !response.success?
Rails.logger.error humanized_error(
verb: 'GET',
url: url,
payload: query_data,
response: response,
)
return []
end
data = response.data&.dig('hits', 'hits')
return [] if !data
data.map do |item|
Rails.logger.debug { "... #{item['_type']} #{item['_id']}" }
output = {
id: item['_id'],
type: index,
}
if options.dig(:highlight_fields_by_indexes, index.to_sym)
output[:highlight] = item['highlight']
end
output
end
end
def self.search_by_index_sort(index:, sort_by: nil, order_by: nil, fulltext: false)
result = (sort_by || [])
.map(&:to_s)
.each_with_object([])
.with_index do |(elem, memo), idx|
next if elem.blank?
next if order_by&.at(idx).blank?
# for sorting values use .keyword values (no analyzer is used - plain values)
is_keyword = get_mapping_properties_object(Array.wrap(index).first.constantize).dig(:properties, elem, :fields, :keyword, :type) == 'keyword'
if is_keyword
elem += '.keyword'
end
memo.push(
elem => {
order: order_by[idx],
},
)
end
# if we have no fulltext search then the primary default sort is updated at else score
if result.blank? && !fulltext
result.push(
updated_at: {
order: 'desc',
},
)
end
result.push('_score')
result
end
=begin
get count of tickets and tickets which match on selector
result = SearchIndexBackend.selectors(index, selector)
example with a simple search:
result = SearchIndexBackend.selectors('Ticket', { 'category' => { 'operator' => 'is', 'value' => 'aa::ab' } })
result = [
{ id: 1, type: 'Ticket' },
{ id: 2, type: 'Ticket' },
{ id: 3, type: 'Ticket' },
]
you also can get aggregations
result = SearchIndexBackend.selectors(index, selector, options, aggs_interval)
example for aggregations within one year
aggs_interval = {
from: '2015-01-01',
to: '2015-12-31',
interval: 'month', # year, quarter, month, week, day, hour, minute, second
field: 'created_at',
}
options = {
limit: 123,
current_user: User.find(123),
}
result = SearchIndexBackend.selectors('Ticket', { 'category' => { 'operator' => 'is', 'value' => 'aa::ab' } }, options, aggs_interval)
result = {
hits:{
total:4819,
},
aggregations:{
time_buckets:{
buckets:[
{
key_as_string:"2014-10-01T00:00:00.000Z",
key:1412121600000,
doc_count:420
},
{
key_as_string:"2014-11-01T00:00:00.000Z",
key:1414800000000,
doc_count:561
},
...
]
}
}
}
=end
def self.selectors(index, selectors = nil, options = {}, aggs_interval = nil)
raise 'no selectors given' if !selectors
url = build_url(type: index, action: '_search', with_pipeline: false, with_document_type: false)
return if url.blank?
data = selector2query(index, selectors, options, aggs_interval)
response = make_request(url, data: data, method: :post)
if !response.success?
raise humanized_error(
verb: 'GET',
url: url,
payload: data,
response: response,
)
end
Rails.logger.debug { response.data.to_json }
if aggs_interval.blank? || aggs_interval[:interval].blank?
object_ids = []
response.data['hits']['hits'].each do |item|
object_ids.push item['_id']
end
# in lower ES 6 versions, we get total count directly, in higher
# versions we need to pick it from total has
count = response.data['hits']['total']
if response.data['hits']['total'].class != Integer
count = response.data['hits']['total']['value']
end
return {
count: count,
object_ids: object_ids,
}
end
response.data
end
def self.selector2query(index, selector, options, aggs_interval)
Selector::SearchIndex.new(selector: selector, options: options.merge(aggs_interval: aggs_interval), target_class: index.constantize).get
end
=begin
return true if backend is configured
result = SearchIndexBackend.enabled?
=end
def self.enabled?
return false if Setting.get('es_url').blank?
true
end
def self.build_index_name(index = nil)
local_index = "#{Setting.get('es_index')}_#{Rails.env}"
return local_index if index.blank?
"#{local_index}_#{index.underscore.tr('/', '_')}"
end
=begin
generate url for index or document access (only for internal use)
# url to access single document in index (in case with_pipeline or not)
url = SearchIndexBackend.build_url(type: 'User', object_id: 123, with_pipeline: true)
# url to access whole index
url = SearchIndexBackend.build_url(type: 'User')
# url to access document definition in index (only es6 and higher)
url = SearchIndexBackend.build_url(type: 'User', with_pipeline: false, with_document_type: true)
# base url
url = SearchIndexBackend.build_url
=end
def self.build_url(type: nil, action: nil, object_id: nil, with_pipeline: true, with_document_type: true, url_params: {})
return if !SearchIndexBackend.enabled?
# set index
index = build_index_name(type)
# add pipeline if needed
if index && with_pipeline == true
url_pipline = Setting.get('es_pipeline')
if url_pipline.present?
url_params['pipeline'] = url_pipline
end
end
# prepare url params
params_string = ''
if url_params.present?
params_string = "?#{URI.encode_www_form(url_params)}"
end
url = Setting.get('es_url')
return "#{url}#{params_string}" if index.blank?
# add type information
url = "#{url}/#{index}"
# add document type
if with_document_type
url = "#{url}/_doc"
end
# add action
if action
url = "#{url}/#{action}"
end
# add object id
if object_id.present?
url = "#{url}/#{object_id}"
end
"#{url}#{params_string}"
end
def self.humanized_error(verb:, url:, response:, payload: nil)
prefix = "Unable to process #{verb} request to elasticsearch URL '#{url}'."
suffix = "\n\nResponse:\n#{response.inspect}\n\n"
if payload.respond_to?(:to_json)
suffix += "Payload:\n#{payload.to_json}"
suffix += "\n\nPayload size: #{payload.to_json.bytesize / 1024 / 1024}M"
else
suffix += "Payload:\n#{payload.inspect}"
end
message = if response&.error&.match?('Connection refused') # rubocop:disable Zammad/DetectTranslatableString
__("Elasticsearch is not reachable. It's possible that it's not running. Please check whether it is installed.")
elsif url.end_with?('pipeline/zammad-attachment', 'pipeline=zammad-attachment') && response.code == 400
__('The installed attachment plugin could not handle the request payload. Ensure that the correct attachment plugin is installed (ingest-attachment).')
else
__('Check the response and payload for detailed information:')
end
result = "#{prefix} #{message}#{suffix}"
Rails.logger.error result.first(40_000)
result
end
# add * on simple query like "somephrase23"
def self.append_wildcard_to_simple_query(query)
query = query.strip
query += '*' if query.exclude?(':')
query
end
=begin
@param condition [Hash] search condition
@param options [Hash] search options
@option options [Integer] :from
@option options [Integer] :limit
@option options [Hash] :query_extension applied to ElasticSearch query
@option options [Array<String>] :order_by ordering directions, desc or asc
@option options [Array<String>] :sort_by fields to sort by
@option options [Array<String>] :fulltext If no sorting is defined the current fallback is the sorting by updated_at. But for fulltext searches it makes more sense to search by _score as default. This parameter allows to change to the fallback to _score.
=end
DEFAULT_QUERY_OPTIONS = {
from: 0,
limit: 10
}.freeze
def self.build_query(index, condition, options = {})
options = DEFAULT_QUERY_OPTIONS.merge(options.deep_symbolize_keys)
data = {
from: options[:from],
size: options[:limit],
sort: search_by_index_sort(index: index, sort_by: options[:sort_by], order_by: options[:order_by], fulltext: options[:fulltext]),
query: {
bool: {
must: []
}
}
}
if (extension = options[:query_extension])
data[:query].deep_merge! extension.deep_dup
end
data[:query][:bool][:must].push condition
if options[:ids].present?
data[:query][:bool][:must].push({ ids: { values: options[:ids] } })
end
data
end
=begin
refreshes all indexes to make previous request data visible in future requests
SearchIndexBackend.refresh
=end
def self.refresh
return if !enabled?
url = "#{Setting.get('es_url')}/_all/_refresh"
make_request_and_validate(url, method: :post)
end
=begin
helper method for making HTTP calls
@param url [String] url
@option params [Hash] :data is a payload hash
@option params [Symbol] :method is a HTTP method
@option params [Integer] :open_timeout is HTTP request open timeout
@option params [Integer] :read_timeout is HTTP request read timeout
@return UserAgent response
=end
def self.make_request(url, data: {}, method: :get, open_timeout: 8, read_timeout: 180)
Rails.logger.debug { "# curl -X #{method} \"#{url}\" " }
Rails.logger.debug { "-d '#{data.to_json}'" } if data.present?
options = {
json: true,
open_timeout: open_timeout,
read_timeout: read_timeout,
total_timeout: (open_timeout + read_timeout + 60),
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
verify_ssl: Setting.get('es_ssl_verify'),
}
response = UserAgent.send(method, url, data, options)
Rails.logger.debug { "# #{response.code}" }
response
end
=begin
helper method for making HTTP calls and raising error if response was not success
@param url [String] url
@option args [Hash] see {make_request}
@return [Boolean] always returns true. Raises error if something went wrong.
=end
def self.make_request_and_validate(url, **args)
response = make_request(url, **args)
return true if response.success?
raise humanized_error(
verb: args[:method],
url: url,
payload: args[:data],
response: response
)
end
=begin
This function will return a index mapping based on the
attributes of the database table of the existing object.
mapping = SearchIndexBackend.get_mapping_properties_object(Ticket)
Returns:
mapping = {
User: {
properties: {
firstname: {
type: 'keyword',
},
}
}
}
=end
def self.get_mapping_properties_object(object)
result = {
properties: {}
}
store_columns = %w[preferences data]
# for elasticsearch 6.x and later
string_type = 'text'
string_raw = { type: 'keyword', ignore_above: 5012 }
boolean_raw = { type: 'boolean' }
object.columns_hash.each do |key, value|
if value.type == :string && value.limit && value.limit <= 5000 && store_columns.exclude?(key)
result[:properties][key] = {
type: string_type,
fields: {
keyword: string_raw,
}
}
elsif value.type == :integer
result[:properties][key] = {
type: 'integer',
}
elsif value.type == :datetime || value.type == :date
result[:properties][key] = {
type: 'date',
}
elsif value.type == :boolean
result[:properties][key] = {
type: 'boolean',
fields: {
keyword: boolean_raw,
}
}
elsif value.type == :binary
result[:properties][key] = {
type: 'binary',
}
elsif value.type == :bigint
result[:properties][key] = {
type: 'long',
}
elsif value.type == :decimal
result[:properties][key] = {
type: 'float',
}
end
end
case object.name
when 'Ticket'
result[:properties][:article] = {
type: 'nested',
include_in_parent: true,
}
end
result
end
# get es version
def self.version
@version ||= SearchIndexBackend.info&.dig('version', 'number')
end
def self.configured?
Setting.get('es_url').present?
end
def self.model_indexable?(model_name)
Models.indexable.any? { |m| m.name == model_name }
end
def self.default_model_settings
{
'index.mapping.total_fields.limit' => 2000,
}
end
def self.model_settings(model)
settings = Setting.get('es_model_settings')[model.name] || {}
default_model_settings.merge(settings)
end
def self.all_settings
Models.indexable.each_with_object({}).to_h { |m| [m.name, model_settings(m)] }
end
def self.set_setting(model_name, key, value)
raise "It is not possible to configure settings for the non-indexable model '#{model_name}'." if !model_indexable?(model_name)
raise __("The required parameter 'key' is missing.") if key.blank?
raise __("The required parameter 'value' is missing.") if value.blank?
config = Setting.get('es_model_settings')
config[model_name] ||= {}
config[model_name][key] = value
Setting.set('es_model_settings', config)
end
def self.unset_setting(model_name, key)
raise "It is not possible to configure settings for the non-indexable model '#{model_name}'." if !model_indexable?(model_name)
raise __("The required parameter 'key' is missing.") if key.blank?
config = Setting.get('es_model_settings')
config[model_name] ||= {}
config[model_name].delete(key)
Setting.set('es_model_settings', config)
end
def self.create_index(models = Models.indexable)
models.each do |local_object|
SearchIndexBackend.index(
action: 'create',
name: local_object.name,
data: {
mappings: SearchIndexBackend.get_mapping_properties_object(local_object),
settings: model_settings(local_object),
}
)
end
end
def self.drop_index(models = Models.indexable)
models.each do |local_object|
SearchIndexBackend.index(
action: 'delete',
name: local_object.name,
)
end
end
def self.create_object_index(object)
models = Models.indexable.select { |c| c.to_s == object }
create_index(models)
end
def self.drop_object_index(object)
models = Models.indexable.select { |c| c.to_s == object }
drop_index(models)
end
def self.pipeline(create: false)
pipeline = Setting.get('es_pipeline')
if create && pipeline.blank?
pipeline = "zammad#{SecureRandom.uuid}"
Setting.set('es_pipeline', pipeline)
end
pipeline
end
def self.pipeline_settings
{
ignore_failure: true,
ignore_missing: true,
}
end
def self.create_pipeline
SearchIndexBackend.processors(
"_ingest/pipeline/#{pipeline(create: true)}": [
{
action: 'delete',
},
{
action: 'create',
description: __('Extract zammad-attachment information from arrays'),
processors: [
{
foreach: {
field: 'article',
processor: {
foreach: {
field: '_ingest._value.attachment',
processor: {
attachment: {
target_field: '_ingest._value',
field: '_ingest._value._content',
}.merge(pipeline_settings),
}
}.merge(pipeline_settings),
}
}.merge(pipeline_settings),
},
{
foreach: {
field: 'attachment',
processor: {
attachment: {
target_field: '_ingest._value',
field: '_ingest._value._content',
}.merge(pipeline_settings),
}
}.merge(pipeline_settings),
}
]
}
]
)
end
def self.drop_pipeline
return if pipeline.blank?
SearchIndexBackend.processors(
"_ingest/pipeline/#{pipeline}": [
{
action: 'delete',
},
]
)
end
end