PoseBiz/stretcher

View on GitHub
lib/stretcher/server.rb

Summary

Maintainability
A
1 hr
Test Coverage
module Stretcher
  class Server < EsComponent
    attr_reader :uri, :http, :logger

    # Internal use only.
    # Returns a properly configured HTTP client when initializing an instance
    def self.build_client(uri_components, options={})
      http = Faraday.new(:url => uri_components) do |builder|
        builder.response :json, :content_type => /\bjson$/

        builder.request :json

        builder.options[:timeout] = options[:read_timeout] || 30
        builder.options[:open_timeout] = options[:open_timeout] || 2

        if faraday_configurator = options[:faraday_configurator]
          faraday_configurator.call(builder)
        else
          builder.adapter :excon
        end
      end
      http.headers = {
        :accept =>  'application/json',
        :user_agent => "Stretcher Ruby Gem #{Stretcher::VERSION}",
        "Content-Type" => "application/json"
      }

      if uri_components.user || uri_components.password
        http.basic_auth(uri_components.user, uri_components.password)
      end

      http
    end

    # Internal use only.
    # Builds a logger when initializing an instance
    def self.build_logger(options)
      logger = options[:logger] || Logger.new(STDOUT)

      # We don't want to override the formatter if an external logger is used
      if !options[:logger]
        log_level = options[:log_level] || :warn
        logger.level = Logger.const_get(log_level.to_s.upcase)
        logger.formatter = proc do |severity, datetime, progname, msg|
          "[Stretcher][#{severity}]: #{msg}\n"
        end
      end
      
      logger
    end

    # Instantiate a new instance in a manner convenient for using the block syntax.
    # Can be used interchangably with +Stretcher::Server.new+ but will return the value
    # of the block if present. See the regular constructor for full options.
    def self.with_server(*args)
      s = self.new(*args)
      yield s
    end

    # Represents a Server context in elastic search.
    # Use +with_server+ when you want to use the block syntax.
    # The options hash takes an optional instance of Logger under :logger.
    #
    #    server = Stretcher::Server.new('http://localhost:9200')
    #
    # The default implementation here uses the net_http_persistent adapter
    # for faraday. If you would like to use a different HTTP library, or alter
    # other faraday config settings you may specify an optional :faraday_configurator
    # argument, with a Proc as a value. This will be called once with the faraday builder.
    #
    # For instance:
    # configurator = proc {|builder| builder.adapter :typhoeus
    # Stretcher::Server.new('http://localhost:9200', :faraday_configurator => configurator)
    #
    # You may want to set one or both of :read_timeout and :open_timeout, for Faraday's HTTP
    # settings. The default is read_timeout: 30, open_timeout: 2, which can be quite long
    # in many situations
    #
    # When running inside a multi-threaded server such as EventMachine, and 
    # using a threadsafe HTTP library such as faraday, the mutex synchronization around
    # API calls to the server can be avoided. In this case, set the option http_threadsafe: true
    # 
    def initialize(uri='http://localhost:9200', options={})      
      @http_threadsafe = !!options[:http_threadsafe]
      @request_mtx = Mutex.new unless @http_threadsafe
      @uri = uri.to_s
      @uri_components = URI.parse(@uri)
      @http = self.class.build_client(@uri_components, options)
      @logger = self.class.build_logger(options)
    end

    # Returns a Stretcher::Index object for the index +name+.
    # Optionally takes a block, which will be passed a single arg with the Index obj
    # The block syntax returns the evaluated value of the block
    #
    #    my_server.index(:foo) # => #<Stretcher::Index ...>
    #    my_server.index(:foo) {|idx| 1+1} # => 2
    def index(name, &block)
      idx = Index.new(self, name, :logger => logger)
      block ? block.call(idx) : idx
    end

    # Returns the Stretcher::Cluster for this server
    # Optionally takes a block, which will be passed a single arg with the
    # Cluster object.  The block returns the evaluated value of the block.
    def cluster(&block)
      cluster = Cluster.new(self, :logger => logger)
      block ? block.call(cluster) : cluster
    end

    # Perform a raw bulk operation. You probably want to use Stretcher::Index#bulk_index
    # which properly formats a bulk index request.
    def bulk(data, options={})
      request(:post, path_uri("/_bulk"), options, data)
    end

    # Retrieves stats for this server
    def stats
      request :get, path_uri("/_stats")
    end

    # Retrieves status for this server
    def status
      request :get, path_uri("/_status")
    end

    # Returns true if the server is currently reachable, raises an error otherwise
    def up?
      request(:get, path_uri)
      true
    end

    # Takes an array of msearch data as per
    # http://www.elasticsearch.org/guide/reference/api/multi-search.html
    # Should look something like:
    #    data = [
    #      {"index" : "test"}
    #      {"query" : {"match_all" : {}}, "from" : 0, "size" : 10}
    #      {"index" : "test", "search_type" : "count"}
    #      {"query" : {"match_all" : {}}}
    #    ]
    #    server.msearch(data)
    def msearch(body=[])
      raise ArgumentError, "msearch takes an array!" unless body.is_a?(Array)
      fmt_body = body.map {|l| MultiJson.dump(l) }.join("\n") << "\n"

      res = request(:get, path_uri("/_msearch"), {}, fmt_body, {}, :mashify => false)

      errors = res['responses'].map { |response| response['error'] }.compact
      if !errors.empty?
        raise RequestError.new(res), "Could not msearch #{errors.inspect}"
      end

      res['responses'].map {|r| SearchResults.new(r)}
    end

    # Retrieves multiple documents, possibly from multiple indexes
    # Takes an array of docs, returns an array of docs
    # as per: http://www.elasticsearch.org/guide/reference/api/multi-get.html
    # If you pass :found => true as the second argument it will not return stubs
    # for missing documents. :exists => false works in reverse
    def mget(docs=[], arg_opts={})
      #Legacy API
      return legacy_mget(docs) if docs.is_a?(Hash)
      
      opts = {:exists => true}.merge(arg_opts)
      
      res = request(:get, path_uri("/_mget"), {}, {:docs => docs})[:docs]
      if opts.has_key?(:exists) || opts.has_key?(:found)
        res.select {|d| d[:found] == true}
      else
        res
      end
    end
    
    # legacy implementation of mget for backwards compat
    def legacy_mget(body)
      request(:get, path_uri("/_mget"), {}, body)
    end

    # Implements the Analyze API
    # Ex:
    #    server.analyze("Candles", analyzer: :snowball)
    #    # => #<Hashie::Mash tokens=[#<Hashie::Mash end_offset=7 position=1 start_offset=0 token="candl" type="<ALPHANUM>">]>
    # as per: http://www.elasticsearch.org/guide/reference/api/admin-indices-analyze.html
    def analyze(text, analysis_params)
      request(:get, path_uri("/_analyze"), analysis_params) do |req|
        req.body = text
      end
    end

    # Implements the Aliases API
    # Ex:
    # server.aliases({actions: {add: {index: :my_index, alias: :my_alias}}})
    # as per: http://www.elasticsearch.org/guide/reference/api/admin-indices-aliases.html
    def aliases(body=nil)
      if body
        request(:post, path_uri("/_aliases"), {}, body)
      else
        request(:get, path_uri("/_aliases"))
      end
    end

    def get_alias(alias_name_or_prefix)
      do_alias(alias_name_or_prefix)
    end

    # Perform a refresh, making all indexed documents available
    def refresh
      do_refresh
    end

    # Full path to the server root dir
    def path_uri(path=nil)
      URI.join(@uri.to_s, "#{@uri_components.path}/#{path.to_s}").to_s
    end

    # Handy way to query the server, returning *only* the body
    # Will raise an exception when the status is not in the 2xx range
    def request(method, path, params={}, body=nil, headers={}, options={}, &block)
      options = { :mashify => true }.merge(options)
      req = http.build_request(method)
      req.path = path
      req.params.update(Util.clean_params(params)) if params
      req.body = body
      req.headers.update(headers) if headers
      block.call(req) if block
      logger.debug { Util.curl_format(req) }

      if @http_threadsafe
        env = req.to_env(http)
        check_response(http.app.call(env), options)
      else
        @request_mtx.synchronize {
          env = req.to_env(http)
          check_response(http.app.call(env), options)
        }
      end
    end

    private


    # Internal use only
    # Check response codes from request
    def check_response(res, options)
      if res.status >= 200 && res.status <= 299
        if(options[:mashify] && res.body.instance_of?(Hash))
          Hashie::Mash.new(res.body)
        else
          res.body
        end
      elsif [404, 410].include? res.status
        err_str = "Error processing request: (#{res.status})! #{res.env[:method]} URL: #{res.env[:url]}"
        err_str << "\n Resp Body: #{res.body}"
        raise RequestError::NotFound.new(res), err_str
      else
        err_str = "Error processing request (#{res.status})! #{res.env[:method]} URL: #{res.env[:url]}"
        err_str << "\n Resp Body: #{res.body}"
        raise RequestError.new(res), err_str
      end
    end
  end
end