lib/agnostic_backend/elasticsearch/indexer.rb
module AgnosticBackend
module Elasticsearch
class Indexer < AgnosticBackend::Indexer
include AgnosticBackend::Utilities
def publish(document)
client.send_request(:put,
path: "/#{index.index_name}/#{index.type}/#{document["id"]}",
body: document)
end
def publish_all(documents)
return if documents.empty?
response = client.send_request(:post,
path: "/#{index.index_name}/#{index.type}/_bulk",
body: convert_to_bulk_upload_string(documents))
body = ActiveSupport::JSON.decode(response.body) rescue {}
# if at least one indexing attempt fails, raise the red flag
raise IndexingError.new, "Error in bulk upload" if body["errors"]
response
end
private
def client
index.client
end
def prepare(document)
raise IndexingError.new, "Document does not have an ID field" unless document["id"].present?
document
end
def transform(document)
return {} if document.empty?
document = flatten document
document = reject_blank_values_from document
document = format_dates_in document
document
end
def format_dates_in(document)
document.each do |k, v|
if v.is_a?(Time)
document[k] = v.utc.strftime("%Y-%m-%dT%H:%M:%SZ")
elsif v.is_a?(Array) && v.all?{|e| e.is_a?(Time)}
document[k] = v.map{|e| e.utc.strftime("%Y-%m-%dT%H:%M:%SZ")}
end
end
end
def convert_to_bulk_upload_string(documents)
documents.map do |document|
next if document.empty?
header = { "index" => {"_id" => document["id"]}}.to_json
document = ActiveSupport::JSON.encode transform(prepare(document))
"#{header}\n#{document}\n"
end.compact.join("\n")
end
end
end
end