lib/elasticsearch/transport/base_protocol.rb
require 'faraday'
module ElasticSearch
module Transport
module IndexProtocol
def index(index, type, id, document, options={})
body = encoder.is_encoded?(document) ? document : encoder.encode(document)
if id.nil?
response = request(:post, {:index => index, :type => type}, options, body)
else
response = request(:put, {:index => index, :type => type, :id => id}, options, body)
end
handle_error(response) unless (response.status == 200 or response.status == 201)
encoder.decode(response.body)
end
def get(index, type, id, options={})
response = request(:get, {:index => index, :type => type, :id => id}, options)
return nil if response.status == 404
handle_error(response) unless response.status == 200
hit = encoder.decode(response.body)
unescape_id!(hit) #TODO extract these two calls from here and search
set_encoding!(hit)
hit # { "_id", "_index", "_type", "_source" }
end
def delete(index, type, id, options={})
response = request(:delete,{:index => index, :type => type, :id => id}, options)
handle_error(response) unless response.status == 200 # ElasticSearch always returns 200 on delete, even if the object doesn't exist
encoder.decode(response.body)
end
def delete_by_query(index, type, query, options={})
# pass the query through the parameters in all the cases, since
# DELETE with a body are ambiguously supported
if query.is_a?(Hash)
params = options.merge(:source => encoder.encode(query))
else
params = options.merge(:q => query)
end
request(:delete, {:index => index, :type => type, :op => "_query"}, params)
end
def search(index, type, query, options={})
if query.is_a?(Hash)
# Some http libraries cannot submit get requests with content, so if query is a hash, post it instead (assume a query hash is using the query dsl)
response = request(:get, {:index => index, :type => type, :op => "_search"}, options, encoder.encode(query))
else
response = request(:get, {:index => index, :type => type, :op => "_search"}, options.merge(:q => query))
end
handle_error(response) unless response.status == 200
results = encoder.decode(response.body)
# unescape ids
results["hits"]["hits"].each do |hit|
unescape_id!(hit)
set_encoding!(hit)
end
results # {"hits"=>{"hits"=>[{"_id", "_type", "_source", "_index", "_score"}], "total"}, "_shards"=>{"failed", "total", "successful"}}
end
def scroll(scroll_id, options={})
# Some http libraries cannot submit get requests with content, so we pass the scroll_id in the parameters
response = request(:get, {:op => "_search/scroll"}, options.merge(:scroll_id => scroll_id))
handle_error(response) unless response.status == 200
results = encoder.decode(response.body)
# unescape ids
results["hits"]["hits"].each do |hit|
unescape_id!(hit)
set_encoding!(hit)
end
results # {"hits"=>{"hits"=>[{"_id", "_type", "_source", "_index", "_score"}], "total"}, "_shards"=>{"failed", "total", "successful"}, "_scrollId"}
end
def count(index, type, query, options={})
if query.is_a?(Hash)
# Some http libraries cannot submit get requests with content, so if query is a hash, post it instead (assume a query hash is using the query dsl)
response = request(:post, {:index => index, :type => type, :op => "_count"}, options, encoder.encode(query))
else
response = request(:get, {:index => index, :type => type, :op => "_count"}, options.merge(:q => query))
end
handle_error(response) unless response.status == 200
encoder.decode(response.body) # {"count", "_shards"=>{"failed", "total", "successful"}}
end
def bulk(actions, options={})
body = actions.inject("") { |a, s| a << encoder.encode(s) << "\n" }
response = request(:post, {:op => '_bulk'}, options, body)
handle_error(response) unless response.status == 200
encoder.decode(response.body) # {"items => [ {"delete"/"create" => {"_index", "_type", "_id", "ok"}} ] }
end
# Uses a post request so we can send ids in content
def multi_get(index, type, query, options={})
# { "docs" = [ {...}, {...}, ...]}
if query.is_a?(Array)
query = { "ids" => query }
end
results = standard_request(:post, { :index => index, :type => type, :op => "_mget"},
options, encoder.encode(query))['docs']
results.each do |hit|
unescape_id!(hit)
set_encoding!(hit)
end
results
end
end
module IndexAdminProtocol
def index_status(index_list, options={})
standard_request(:get, {:index => index_list, :op => "_status"})
end
def create_index(index, create_options={}, options={})
standard_request(:put, {:index => index}, {}, encoder.encode(create_options))
end
def delete_index(index, options={})
standard_request(:delete, {:index => index})
end
def alias_index(operations, options={})
standard_request(:post, {:op => "_aliases"}, {}, encoder.encode(operations))
end
def get_aliases(index, options={})
standard_request(:get, {:index => index, :op => "_aliases"}, options)
end
def update_mapping(index, type, mapping, options)
standard_request(:put, {:index => index, :type => type, :op => "_mapping"}, options, encoder.encode(mapping))
end
def index_mapping(index_list, options={})
standard_request(:get, {:index => index_list, :op => "_mapping"})
end
def delete_mapping(index, type, options={})
standard_request(:delete, {:index => index, :type => type, :op => "_mapping"})
end
def update_settings(index, settings, options)
standard_request(:put, {:index => index, :op => "_settings"}, options, encoder.encode(settings))
end
def get_settings(index, options)
standard_request(:get, {:index => index, :op => "_settings"}, options)
end
def flush(index_list, options={})
standard_request(:post, {:index => index_list, :op => "_flush"}, options, "")
end
def refresh(index_list, options={})
standard_request(:post, {:index => index_list, :op => "_refresh"}, {}, "")
end
def snapshot(index_list, options={})
standard_request(:post, {:index => index_list, :type => "_gateway", :op => "snapshot"}, {}, "")
end
def optimize(index_list, options={})
standard_request(:post, {:index => index_list, :op => "_optimize"}, options, {})
end
def create_river(type, create_options={}, options={})
standard_request(:put, {:index => "_river", :type => type, :op => "_meta"}, options, encoder.encode(create_options))
end
def get_river(type, options={})
standard_request(:get, {:index => "_river", :type => type, :op => "_meta"})
end
def river_status(type, options={})
standard_request(:get, {:index => "_river", :type => type, :op => "_status"})
end
def delete_river(type, options={})
params = {:index => "_river"}
params[:type] = type unless type.nil?
standard_request(:delete, params)
end
end
module ClusterAdminProtocol
def cluster_health(index_list, options={})
standard_request(:get, {:index => "_cluster", :type => "health", :id => index_list}, options)
end
def cluster_state(options={})
standard_request(:get, {:index => "_cluster", :op => "state"})
end
def nodes_info(node_list, options={})
standard_request(:get, {:index => "_cluster", :type => "nodes", :id => node_list})
end
def nodes_stats(node_list, options={})
standard_request(:get, {:index => "_cluster", :type => "nodes", :id => node_list, :op => "stats"})
end
def shutdown_nodes(node_list, options={})
standard_request(:post, {:index => "_cluster", :type => "nodes", :id => node_list, :op => "_shutdown"}, options, "")
end
def restart_nodes(node_list, options={})
standard_request(:post, {:index => "_cluster", :type => "nodes", :id => node_list, :op => "_restart"}, options, "")
end
end
module ProtocolHelpers
private
def standard_request(*args)
response = request(*args)
handle_error(response) unless response.status >= 200 && response.status < 300
encoder.decode(response.body)
end
def handle_error(response)
raise RequestError.new(response.status), "(#{response.status}) #{response.body}"
end
# :index - one or many index names
# :type - one or many types
# :id - one id
# :op - optional operation
def generate_uri(options)
path = ""
path << "/#{Array(options[:index]).collect { |i| escape(i.downcase) }.join(",")}" if options[:index] && !options[:index].empty?
path << "/*" if options[:index] && options[:index].empty?
path << "/#{Array(options[:type]).collect { |t| escape(t) }.join(",")}" if options[:type] && !options[:type].empty?
path << "/#{Array(options[:id]).collect { |id| escape(id) }.join(",")}" if options[:id] && !options[:id].to_s.empty?
path << "/#{options[:op]}" if options[:op]
path
end
#doesn't handle arrays or hashes or what have you
def generate_query_string(params)
params.collect { |k,v| "#{escape(k.to_s)}=#{escape(v.to_s)}" }.join("&")
end
def unescape_id!(hit)
hit["_id"] = unescape(hit["_id"])
nil
end
def set_encoding!(hit)
encode_utf8(hit["_source"]) if hit["_source"].is_a?(String)
nil
end
def escape(string)
Faraday::Utils.escape(string)
end
def unescape(string)
Faraday::Utils.unescape(string)
end
if ''.respond_to?(:force_encoding) && ''.respond_to?(:encoding)
# encodes the string as utf-8 in Ruby 1.9
def encode_utf8(string)
# ElasticSearch only ever returns json in UTF-8 (per the JSON spec) so we can use force_encoding here (#TODO what about ids? can we assume those are always ascii?)
string.force_encoding(::Encoding::UTF_8)
end
else
# returns the unaltered string in Ruby 1.8
def encode_utf8(string)
string
end
end
end
module BaseProtocol
include IndexProtocol
include IndexAdminProtocol
include ClusterAdminProtocol
include ProtocolHelpers
end
end
end