neo4jrb/neo4j-core

View on GitHub
lib/neo4j/core/cypher_session/adaptors/http.rb

Summary

Maintainability
A
35 mins
Test Coverage
require 'neo4j/core/cypher_session/adaptors'
require 'neo4j/core/cypher_session/adaptors/has_uri'
require 'neo4j/core/cypher_session/responses/http'
require 'uri'

# TODO: Work with `Query` objects
module Neo4j
  module Core
    class CypherSession
      module Adaptors
        class HTTP < Base
          attr_reader :requestor, :url

          def initialize(url, options = {})
            @url = url
            @options = options
          end

          DEFAULT_FARADAY_CONFIGURATOR = proc do |faraday|
            require 'typhoeus'
            require 'typhoeus/adapters/faraday'
            faraday.adapter :typhoeus
          end

          def connect
            @requestor = Requestor.new(@url, USER_AGENT_STRING, self.class.method(:instrument_request), @options.fetch(:faraday_configurator, DEFAULT_FARADAY_CONFIGURATOR))
          end

          ROW_REST = %w[row REST]

          def query_set(transaction, queries, options = {})
            setup_queries!(queries, transaction)

            return unless path = transaction.query_path(options.delete(:commit))

            faraday_response = @requestor.post(path, queries)

            transaction.apply_id_from_url!(faraday_response.env[:response_headers][:location])

            wrap_level = options[:wrap_level] || @options[:wrap_level]
            Responses::HTTP.new(faraday_response, wrap_level: wrap_level).results
          end

          def version(_session)
            @version ||= @requestor.get('db/data/').body[:neo4j_version]
          end

          # Schema inspection methods
          def indexes(_session)
            response = @requestor.get('db/data/schema/index')

            check_for_schema_response_error!(response)
            list = response.body || []

            list.map do |item|
              {label: item[:label].to_sym,
               properties: item[:property_keys].map(&:to_sym)}
            end
          end

          CONSTRAINT_TYPES = {
            'UNIQUENESS' => :uniqueness
          }
          def constraints(_session, _label = nil, _options = {})
            response = @requestor.get('db/data/schema/constraint')

            check_for_schema_response_error!(response)
            list = response.body || []
            list.map do |item|
              {type: CONSTRAINT_TYPES[item[:type]],
               label: item[:label].to_sym,
               properties: item[:property_keys].map(&:to_sym)}
            end
          end

          def check_for_schema_response_error!(response)
            if response.body.is_a?(Hash) && response.body.key?(:errors)
              message = response.body[:errors].map { |error| "#{error[:code]}: #{error[:message]}" }.join("\n    ")
              fail CypherSession::ConnectionFailedError, "Connection failure: \n    #{message}"
            elsif !response.success?
              fail CypherSession::ConnectionFailedError, "Connection failure: \n    status: #{response.status} \n    #{response.body}"
            end
          end

          def self.transaction_class
            require 'neo4j/core/cypher_session/transactions/http'
            Neo4j::Core::CypherSession::Transactions::HTTP
          end

          # Schema inspection methods
          def indexes_for_label(label)
            url = db_data_url + "schema/index/#{label}"
            @connection.get(url)
          end

          instrument(:request, 'neo4j.core.http.request', %w[method url body]) do |_, start, finish, _id, payload|
            ms = (finish - start) * 1000
            " #{ANSI::BLUE}HTTP REQUEST:#{ANSI::CLEAR} #{ANSI::YELLOW}#{ms.round}ms#{ANSI::CLEAR} #{payload[:method].upcase} #{payload[:url]} (#{payload[:body].size} bytes)"
          end

          def connected?
            !!@requestor
          end

          def supports_metadata?
            Gem::Version.new(version(nil)) >= Gem::Version.new('2.1.5')
          end

          # Basic wrapper around HTTP requests to standard Neo4j HTTP endpoints
          #  - Takes care of JSONifying objects passed as body (Hash/Array/Query)
          #  - Sets headers, including user agent string
          class Requestor
            include Adaptors::HasUri
            default_url('http://neo4:neo4j@localhost:7474')
            validate_uri { |uri| uri.is_a?(URI::HTTP) }
            def initialize(url, user_agent_string, instrument_proc, faraday_configurator)
              self.url = url
              @user = user
              @password = password
              @user_agent_string = user_agent_string
              @faraday = wrap_connection_failed! { faraday_connection(faraday_configurator) }
              @instrument_proc = instrument_proc
            end

            REQUEST_HEADERS = {'Accept'.to_sym => 'application/json; charset=UTF-8',
                               'Content-Type'.to_sym => 'application/json'}

            # @method HTTP method (:get/:post/:delete/:put)
            # @path Path part of URL
            # @body Body for the request.  If a Query or Array of Queries,
            #       it is automatically converted
            def request(method, path, body = '', _options = {})
              request_body = request_body(body)
              url = url_from_path(path)
              @instrument_proc.call(method, url, request_body) do
                wrap_connection_failed! do
                  @faraday.run_request(method, url, request_body, REQUEST_HEADERS)
                end
              end
            end

            # Convenience method to #request(:post, ...)
            def post(path, body = '', options = {})
              request(:post, path, body, options)
            end

            # Convenience method to #request(:get, ...)
            def get(path, body = '', options = {})
              request(:get, path, body, options)
            end

            private

            def faraday_connection(configurator)
              require 'faraday'
              require 'faraday_middleware/multi_json'

              Faraday.new(url) do |faraday|
                faraday.request :multi_json

                faraday.response :multi_json, symbolize_keys: true, content_type: 'application/json'

                faraday.headers['Content-Type'] = 'application/json'
                faraday.headers['User-Agent'] = @user_agent_string

                configurator.call(faraday)
              end
            end

            def password_config(options)
              options.fetch(:basic_auth, {}).fetch(:password, @password)
            end

            def username_config(options)
              options.fetch(:basic_auth, {}).fetch(:username, @user)
            end

            def request_body(body)
              return body if body.is_a?(String)

              body_is_query_array = body.is_a?(Array) && body.all? { |o| o.respond_to?(:cypher) }
              case body
              when Hash, Array
                return {statements: body.map(&self.class.method(:statement_from_query))} if body_is_query_array

                body
              else
                {statements: [self.class.statement_from_query(body)]} if body.respond_to?(:cypher)
              end
            end

            def wrap_connection_failed!
              yield
            rescue Faraday::ConnectionFailed => e
              raise CypherSession::ConnectionFailedError, "#{e.class}: #{e.message}"
            end

            class << self
              private

              def statement_from_query(query)
                {statement: query.cypher,
                 parameters: query.parameters || {},
                 resultDataContents: ROW_REST}
              end
            end

            def url_base
              "#{scheme}://#{host}:#{port}"
            end

            def url_from_path(path)
              url_base + (path[0] != '/' ? '/' + path : path)
            end
          end
        end
      end
    end
  end
end