lib/tasks/i14y.rake
namespace :i14y do
desc "Creates templates, indexes, and reader/writer aliases for all i14y models"
task setup: :environment do
Dir[Rails.root.join('app', 'templates', '*.rb')].each do |template_generator|
entity_name = File.basename(template_generator, '.rb')
klass = entity_name.camelize.constantize
template_generator = klass.new
ES.client.indices.put_template(
name: entity_name,
body: template_generator.body,
order: 0,
create: true,
include_type_name: false
)
end
es_collections_index_name = [CollectionRepository.index_namespace, 'v1'].join('-')
CollectionRepository.new.create_index!(
index: es_collections_index_name,
include_type_name: true
)
ES.client.indices.put_alias(
index: es_collections_index_name,
name: CollectionRepository.index_name
)
end
desc "Copies data from one version of the i14y index to the next (e.g., collections, documents) and updates the alias"
task :reindex, [:entity_name] => [:environment] do |_t, args|
entity_name = args.entity_name
persistence_model_klass = entity_name.singularize.camelize.constantize
klass = entity_name.camelize.constantize
template_generator = klass.new
ES.client.indices.put_template(name: entity_name,
body: template_generator.body,
order: 0)
wildcard = [persistence_model_klass.index_namespace, '*'].join
aliases = ES.client.indices.get_alias(name: wildcard)
aliases.each do |old_es_index_name, alias_names|
alias_name = alias_names['aliases'].keys.first
persistence_model_klass.index_name = old_es_index_name
new_es_index_name = next_version(old_es_index_name)
puts "Beginning copy of #{persistence_model_klass.count} #{entity_name} from #{old_es_index_name} to #{new_es_index_name}"
persistence_model_klass.create_index!(index: new_es_index_name)
persistence_model_klass.index_name = new_es_index_name
since_timestamp = Time.now
host_hash = ES.client.transport.hosts.first
base_url = "#{host_hash[:protocol]}://#{host_hash[:host]}:#{host_hash[:port]}/"
old_es_index_url = base_url + old_es_index_name
new_es_index_url = base_url + new_es_index_name
stream2es(old_es_index_url, new_es_index_url)
move_alias(alias_name, old_es_index_name, new_es_index_name)
stream2es(old_es_index_url, new_es_index_url, since_timestamp)
puts "New #{new_es_index_name} index now contains #{persistence_model_klass.count} #{entity_name}"
ES.client.indices.delete(index: old_es_index_name)
end
end
desc "Deletes templates, indexes, and reader/writer aliases for all i14y models. Useful for development."
task clear_all: :environment do
Dir[Rails.root.join('app', 'templates', '*.rb')].each do |template_generator|
entity_name = File.basename(template_generator, '.rb')
ES.client.indices.delete_template(name: entity_name) rescue Elasticsearch::Transport::Transport::Errors::NotFound
end
ES.client.indices.delete(index: [Rails.env, I14y::APP_NAME, '*'].join('-'))
end
def next_version(index_name)
matches = index_name.match(/(.*-v)(\d+)/)
"#{matches[1]}#{matches[2].succ}"
end
def stream2es(old_es_index_url, new_es_index_url, timestamp = nil)
options = ["--source #{old_es_index_url}", "--target #{new_es_index_url}"]
if timestamp.present?
hash = { query: { filtered: { filter: { range: { updated_at: { gte: timestamp } } } } } }
options << "--query '#{hash.to_json}'"
end
result = `#{Rails.root.join('vendor', 'stream2es')} es #{options.join(' ')}`
puts "Stream2es completed", result
end
def move_alias(alias_name, old_index_name, new_index_name)
update_aliases_hash = { body:
{ actions: [
{ remove: { index: old_index_name, alias: alias_name } },
{ add: { index: new_index_name, alias: alias_name } }
] } }
ES.client.indices.update_aliases(update_aliases_hash)
end
end