lib/elasticity/strategies/alias_index.rb
module Elasticity
module Strategies
# This strategy keeps two aliases that might be mapped to the same index or different index, allowing
# runtime changes by simply atomically updating the aliases. For example, look at the remap method
# implementation.
class AliasIndex
SNAPSHOT_ERROR_SNIPPET = "Cannot delete indices that are being snapshotted"
RETRYABLE_ERROR_SNIPPETS = [
SNAPSHOT_ERROR_SNIPPET
].freeze
STATUSES = [:missing, :ok]
def initialize(client, index_base_name, document_type, use_new_timestamp_format = nil, include_type_name_on_create = nil)
@client = client
@main_alias = index_base_name
@update_alias = "#{index_base_name}_update"
@document_type = document_type
end
def ref_index_name
@main_alias
end
# Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes. Here is the overview
# of how it works:
#
# 1. Creates a new index with the new mapping
# 2. Update the aliases so that any write goes to the new index and reads goes to both indexes.
# 3. Use scan and scroll to iterate over all the documents in the old index, moving them to the
# new index.
# 4. Update the aliases so that all operations goes to the new index.
# 5. Deletes the old index.
#
# It does a little bit more to ensure consistency and to handle race-conditions. For more details
# look at the implementation.
def remap(index_def, retry_delete_on_recoverable_errors: false, retry_delay: 0, max_delay: 0)
main_indexes = self.main_indexes
update_indexes = self.update_indexes
if main_indexes.size != 1 || update_indexes.size != 1 || main_indexes != update_indexes
raise "Index can't be remapped right now, check if another remapping is already happening"
end
new_index = create_index(index_def)
original_index = main_indexes[0]
begin
# Configure aliases so that search includes the old index and the new index, and writes are made to
# the new index.
@client.index_update_aliases(body: {
actions: [
{ remove: { index: original_index, alias: @update_alias } },
{ add: { index: new_index, alias: @update_alias } },
{ add: { index: new_index, alias: @main_alias }},
]
})
@client.index_refresh(index: original_index)
cursor = @client.search index: original_index, search_type: :query_then_fetch, scroll: "10m", size: 100
loop do
hits = cursor["hits"]["hits"]
break if hits.empty?
# Fetch documents based on the ids that existed when the migration started, to make sure we only migrate
# documents that haven't been deleted.
id_docs = hits.map do |hit|
{ _index: original_index, _id: hit["_id"] }
end
docs = @client.mget(body: { docs: id_docs }, refresh: true)["docs"]
break if docs.empty?
# Modify document hashes to match the mapping definition so that legacy fields aren't added
defined_mapping_fields = index_def[:mappings]["properties"].keys
# Move only documents that still exists on the old index, into the new index.
ops = []
docs.each do |doc|
if doc["found"]
legacy_fields = doc["_source"].keys - defined_mapping_fields
legacy_fields.each { |field| doc["_source"].delete(field) }
ops << { index: { _index: new_index, _id: doc["_id"], data: doc["_source"] } }
end
end
@client.bulk(body: ops)
# Deal with race conditions by removing from the new index any document that doesn't exist in the old index anymore.
ops = []
@client.mget(body: { docs: id_docs }, refresh: true)["docs"].each_with_index do |new_doc, idx|
if docs[idx]["found"] && !new_doc["found"]
ops << { delete: { _index: new_index, _id: new_doc["_id"] } }
end
end
@client.bulk(body: ops) unless ops.empty?
cursor = @client.scroll(scroll_id: cursor["_scroll_id"], scroll: "1m", body: { scroll_id: cursor["_scroll_id"] })
end
# Update aliases to only point to the new index.
@client.index_update_aliases(body: {
actions: [
{ remove: { index: original_index, alias: @main_alias } },
]
})
waiting_duration = 0
begin
@client.index_delete(index: original_index)
rescue Elastic::Transport::Transport::ServerError => e
if retryable_error?(e) && retry_delete_on_recoverable_errors && waiting_duration < max_delay
waiting_duration += retry_delay
sleep(retry_delay)
retry
else
raise e
end
end
rescue
@client.index_update_aliases(body: {
actions: [
{ add: { index: original_index, alias: @main_alias } },
{ add: { index: original_index, alias: @update_alias } },
{ remove: { index: new_index, alias: @update_alias } },
]
})
@client.index_refresh(index: new_index)
cursor = @client.search index: new_index, search_type: :query_then_fetch, scroll: "1m", size: 100
loop do
hits = cursor["hits"]["hits"]
break if hits.empty?
# Move all the documents that exists on the new index back to the old index
ops = []
hits.each do |doc|
ops << { index: { _index: original_index, _id: doc["_id"], data: doc["_source"] } }
end
@client.bulk(body: ops)
cursor = @client.scroll(scroll_id: cursor["_scroll_id"], scroll: "1m")
end
@client.index_refresh(index: original_index)
@client.index_update_aliases(body: {
actions: [
{ remove: { index: new_index, alias: @main_alias } },
]
})
@client.index_delete(index: new_index)
raise
end
end
def status
search_exists = @client.index_exists_alias(name: @main_alias)
update_exists = @client.index_exists_alias(name: @update_alias)
case
when search_exists && update_exists
:ok
when !search_exists && !update_exists
:missing
else
:inconsistent
end
end
def missing?
status == :missing
end
def main_indexes
@client.index_get_alias(index: "#{@main_alias}-*", name: @main_alias).keys
rescue Elastic::Transport::Transport::Errors::NotFound
[]
end
def update_indexes
@client.index_get_alias(index: "#{@main_alias}-*", name: @update_alias).keys
rescue Elastic::Transport::Transport::Errors::NotFound
[]
end
def create(index_def)
if missing?
name = create_index(index_def)
@created_index_name = name
@client.index_update_aliases(body: {
actions: [
{ add: { index: name, alias: @main_alias } },
{ add: { index: name, alias: @update_alias } },
]
})
else
raise IndexError.new(@main_alias, "index already exists")
end
end
def create_if_undefined(index_def)
create(index_def) if missing?
end
def delete
main_indexes.each do |index|
@client.index_delete(index: index)
end
end
def delete_if_defined
delete unless missing?
end
def recreate(index_def)
delete_if_defined
create(index_def)
end
def index_document(id, attributes)
res = @client.index(index: @update_alias, id: id, body: attributes)
if id = res["_id"]
[id, res["_shards"] && res["_shards"]["successful"].to_i > 0]
else
raise IndexError.new(@update_alias, "failed to index document. Response: #{res.inspect}")
end
end
def delete_document(id)
ops = (main_indexes | update_indexes).map do |index|
{ delete: { _index: index, _id: id } }
end
@client.bulk(body: ops)
end
def get_document(id)
@client.get(index: @main_alias, id: id)
end
def search_index
@main_alias
end
def delete_by_query(body)
@client.delete_by_query(index: @main_alias, body: body)
end
def bulk
b = Bulk::Alias.new(@client, @update_alias, main_indexes)
yield b
b.execute
end
def flush
@client.index_flush(index: @update_alias)
end
def refresh
@client.index_refresh(index: @update_alias)
end
def settings
@client.index_get_settings(index: @main_alias).values.first
rescue Elastic::Transport::Transport::Errors::NotFound
nil
end
def mappings
ActiveSupport::Deprecation.warn(
"Elasticity::Strategies::AliasIndex#mappings is deprecated, "\
"use mapping instead"
)
mapping
end
def mapping
@client.index_get_mapping(index: @main_alias).values.first
rescue Elastic::Transport::Transport::Errors::NotFound
nil
end
private
def build_index_name
ts = Time.now.utc.strftime("%Y%m%d%H%M%S%6N")
"#{@main_alias}-#{ts}"
end
def create_index(index_def)
name = build_index_name
@client.index_create(index: name, body: index_def)
name
end
def retryable_error?(e)
RETRYABLE_ERROR_SNIPPETS.any? do |s|
e.message.match(s)
end
end
end
end
end