grantr/rubberband

View on GitHub
lib/elasticsearch/transport/base_protocol.rb

Summary

Maintainability
A
2 hrs
Test Coverage
require 'faraday'

module ElasticSearch
  module Transport
    module IndexProtocol
      def index(index, type, id, document, options={})
        body = encoder.is_encoded?(document) ? document : encoder.encode(document)
        if id.nil?
          response = request(:post, {:index => index, :type => type}, options, body)
        else
          response = request(:put, {:index => index, :type => type, :id => id}, options, body)
        end
        handle_error(response) unless (response.status == 200 or response.status == 201)
        encoder.decode(response.body)
      end

      def get(index, type, id, options={})
        response = request(:get, {:index => index, :type => type, :id => id}, options)
        return nil if response.status == 404

        handle_error(response) unless response.status == 200
        hit = encoder.decode(response.body)
        unescape_id!(hit) #TODO extract these two calls from here and search
        set_encoding!(hit)
        hit # { "_id", "_index", "_type", "_source" }
      end

      def delete(index, type, id, options={})
        response = request(:delete,{:index => index, :type => type, :id => id}, options)
        handle_error(response) unless response.status == 200 # ElasticSearch always returns 200 on delete, even if the object doesn't exist
        encoder.decode(response.body)
      end
      
      def delete_by_query(index, type, query, options={})
        # pass the query through the parameters in all the cases, since
        # DELETE with a body are ambiguously supported
        if query.is_a?(Hash)
          params = options.merge(:source => encoder.encode(query))
        else
          params = options.merge(:q => query)
        end
        request(:delete, {:index => index, :type => type, :op => "_query"}, params)
      end      

      def search(index, type, query, options={})
        if query.is_a?(Hash)
          # Some http libraries cannot submit get requests with content, so if query is a hash, post it instead (assume a query hash is using the query dsl)
          response = request(:get, {:index => index, :type => type, :op => "_search"}, options, encoder.encode(query))
        else
          response = request(:get, {:index => index, :type => type, :op => "_search"}, options.merge(:q => query))
        end
        handle_error(response) unless response.status == 200
        results = encoder.decode(response.body)
        # unescape ids
        results["hits"]["hits"].each do |hit|
          unescape_id!(hit)
          set_encoding!(hit)
        end
        results # {"hits"=>{"hits"=>[{"_id", "_type", "_source", "_index", "_score"}], "total"}, "_shards"=>{"failed", "total", "successful"}}
      end

      def scroll(scroll_id, options={})
        # Some http libraries cannot submit get requests with content, so we pass the scroll_id in the parameters
        response = request(:get, {:op => "_search/scroll"}, options.merge(:scroll_id => scroll_id))
        handle_error(response) unless response.status == 200
        results = encoder.decode(response.body)
        # unescape ids
        results["hits"]["hits"].each do |hit|
          unescape_id!(hit)
          set_encoding!(hit)
        end
        results # {"hits"=>{"hits"=>[{"_id", "_type", "_source", "_index", "_score"}], "total"}, "_shards"=>{"failed", "total", "successful"}, "_scrollId"}
      end

      def count(index, type, query, options={})
        if query.is_a?(Hash)
          # Some http libraries cannot submit get requests with content, so if query is a hash, post it instead (assume a query hash is using the query dsl)
          response = request(:post, {:index => index, :type => type, :op => "_count"}, options, encoder.encode(query))
        else
          response = request(:get, {:index => index, :type => type, :op => "_count"}, options.merge(:q => query))
        end
        handle_error(response) unless response.status == 200
        encoder.decode(response.body) # {"count", "_shards"=>{"failed", "total", "successful"}}
      end

      def bulk(actions, options={})
        body = actions.inject("") { |a, s| a << encoder.encode(s) << "\n" }
        response = request(:post, {:op => '_bulk'}, options, body)
        handle_error(response) unless response.status == 200
        encoder.decode(response.body) # {"items => [ {"delete"/"create" => {"_index", "_type", "_id", "ok"}} ] }
      end
      
      # Uses a post request so we can send ids in content
      def multi_get(index, type, query, options={})
        # { "docs" = [ {...}, {...}, ...]}
        if query.is_a?(Array)
          query = { "ids" => query }
        end
        results = standard_request(:post, { :index => index, :type => type, :op => "_mget"}, 
                                   options, encoder.encode(query))['docs']
        results.each do |hit|
          unescape_id!(hit)
          set_encoding!(hit)
        end
        results
      end
    end

    module IndexAdminProtocol
      def index_status(index_list, options={})
        standard_request(:get, {:index => index_list, :op => "_status"})
      end

      def create_index(index, create_options={}, options={})
        standard_request(:put, {:index => index}, {}, encoder.encode(create_options))
      end

      def delete_index(index, options={})
        standard_request(:delete, {:index => index})
      end

      def alias_index(operations, options={})
        standard_request(:post, {:op => "_aliases"}, {}, encoder.encode(operations))
      end

      def get_aliases(index, options={})
        standard_request(:get, {:index => index, :op => "_aliases"}, options)
      end

      def update_mapping(index, type, mapping, options)
        standard_request(:put, {:index => index, :type => type, :op => "_mapping"}, options, encoder.encode(mapping))
      end

      def index_mapping(index_list, options={})
        standard_request(:get, {:index => index_list, :op => "_mapping"})
      end

      def delete_mapping(index, type, options={})
        standard_request(:delete, {:index => index, :type => type, :op => "_mapping"})
      end

      def update_settings(index, settings, options)
        standard_request(:put, {:index => index, :op => "_settings"}, options, encoder.encode(settings))
      end

      def get_settings(index, options)
        standard_request(:get, {:index => index, :op => "_settings"}, options)
      end

      def flush(index_list, options={})
        standard_request(:post, {:index => index_list, :op => "_flush"}, options, "")
      end
      
      def refresh(index_list, options={})
        standard_request(:post, {:index => index_list, :op => "_refresh"}, {}, "")
      end
      
      def snapshot(index_list, options={})
        standard_request(:post, {:index => index_list, :type => "_gateway", :op => "snapshot"}, {}, "")
      end
      
      def optimize(index_list, options={})
        standard_request(:post, {:index => index_list, :op => "_optimize"}, options, {})
      end

      def create_river(type, create_options={}, options={})
        standard_request(:put, {:index => "_river", :type => type, :op => "_meta"}, options, encoder.encode(create_options))
      end

      def get_river(type, options={})
        standard_request(:get, {:index => "_river", :type => type, :op => "_meta"})
      end

      def river_status(type, options={})
        standard_request(:get, {:index => "_river", :type => type, :op => "_status"})
      end

      def delete_river(type, options={})
        params = {:index => "_river"}
        params[:type] = type unless type.nil?
        standard_request(:delete, params)
      end
    end

    module ClusterAdminProtocol
      def cluster_health(index_list, options={})
        standard_request(:get, {:index => "_cluster", :type => "health", :id => index_list}, options)
      end

      def cluster_state(options={})
        standard_request(:get, {:index => "_cluster", :op => "state"})
      end

      def nodes_info(node_list, options={})
        standard_request(:get, {:index => "_cluster", :type => "nodes", :id => node_list})
      end

      def nodes_stats(node_list, options={})
        standard_request(:get, {:index => "_cluster", :type => "nodes", :id => node_list, :op => "stats"})
      end

      def shutdown_nodes(node_list, options={})
        standard_request(:post, {:index => "_cluster", :type => "nodes", :id => node_list, :op => "_shutdown"}, options, "")
      end

      def restart_nodes(node_list, options={})
        standard_request(:post, {:index => "_cluster", :type => "nodes", :id => node_list, :op =>  "_restart"}, options, "")
      end
    end

    module ProtocolHelpers
      private

      def standard_request(*args)
        response = request(*args)
        handle_error(response) unless response.status >= 200 && response.status < 300
        encoder.decode(response.body)
      end

      def handle_error(response)
        raise RequestError.new(response.status), "(#{response.status}) #{response.body}"
      end

      # :index - one or many index names
      # :type - one or many types
      # :id - one id
      # :op - optional operation
      def generate_uri(options)
        path = ""
        path << "/#{Array(options[:index]).collect { |i| escape(i.downcase) }.join(",")}" if options[:index] && !options[:index].empty?
        path << "/*" if options[:index] && options[:index].empty?
        path << "/#{Array(options[:type]).collect { |t| escape(t) }.join(",")}" if options[:type] && !options[:type].empty?
        path << "/#{Array(options[:id]).collect { |id| escape(id) }.join(",")}" if options[:id] && !options[:id].to_s.empty?
        path << "/#{options[:op]}" if options[:op]
        path
      end

      #doesn't handle arrays or hashes or what have you
      def generate_query_string(params)
        params.collect { |k,v| "#{escape(k.to_s)}=#{escape(v.to_s)}" }.join("&")
      end

      def unescape_id!(hit)
        hit["_id"] = unescape(hit["_id"])
        nil
      end

      def set_encoding!(hit)
        encode_utf8(hit["_source"]) if hit["_source"].is_a?(String)
        nil
      end

      def escape(string)
        Faraday::Utils.escape(string)
      end

      def unescape(string)
        Faraday::Utils.unescape(string)
      end

      if ''.respond_to?(:force_encoding) && ''.respond_to?(:encoding)
        # encodes the string as utf-8 in Ruby 1.9
        def encode_utf8(string)
          # ElasticSearch only ever returns json in UTF-8 (per the JSON spec) so we can use force_encoding here (#TODO what about ids? can we assume those are always ascii?)
          string.force_encoding(::Encoding::UTF_8)
        end
      else
        # returns the unaltered string in Ruby 1.8
        def encode_utf8(string)
          string
        end
      end
    end

    module BaseProtocol
      include IndexProtocol
      include IndexAdminProtocol
      include ClusterAdminProtocol
      include ProtocolHelpers

    end
  end
end