bigcommerce/gruf

View on GitHub
lib/gruf/client.rb

Summary

Maintainability
A
1 hr
Test Coverage
A
95%
# frozen_string_literal: true

# Copyright (c) 2017-present, BigCommerce Pty. Ltd. All rights reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
module Gruf
  ##
  # Abstracts out the calling interface for interacting with gRPC clients. Streamlines calling and provides
  # instrumented response objects that also can contain deserialized error messages via serialized objects transported
  # via the service's trailing metadata.
  #
  #   begin
  #     client = ::Gruf::Client.new(service: ::Demo::ThingService)
  #     response = client.call(:GetMyThing, id: 123)
  #     puts response.message.inspect
  #   rescue Gruf::Client::Error => e
  #     puts e.error.inspect
  #   end
  #
  # Utilizes SimpleDelegator to wrap the given service that the client is connecting to, which allows a clean interface
  # to the underlying RPC descriptors and methods.
  #
  class Client < SimpleDelegator
    include Gruf::Loggable

    # @!attribute [r] base_klass
    #   @return [Class] The base, friendly name of the service being requested
    attr_reader :base_klass
    # @!attribute [r] service_klass
    #   @return [Class] The class name of the gRPC service being requested
    attr_reader :service_klass
    # @!attribute [r] opts
    #   @return [Hash] A hash of options for the client
    attr_reader :opts

    ##
    # Initialize the client and setup the stub
    #
    # @param [Module] service The namespace of the client Stub that is desired to load
    # @param [Hash] options A hash of options for the client
    # @option options [String] :password The password for basic authentication for the service.
    # @option options [String] :hostname The hostname of the service. Defaults to linkerd.
    # @param [Hash] client_options A hash of options to pass to the gRPC client stub
    #
    def initialize(service:, options: {}, client_options: {})
      @base_klass = service
      @service_klass = "#{base_klass}::Service".constantize
      @opts = options || {}
      @opts[:password] = @opts.fetch(:password, '').to_s
      @opts[:hostname] = @opts.fetch(:hostname, Gruf.default_client_host)
      @opts[:channel_credentials] = @opts.fetch(:channel_credentials, Gruf.default_channel_credentials)
      @error_factory = Gruf::Client::ErrorFactory.new
      client_options[:timeout] = parse_timeout(client_options[:timeout]) if client_options.key?(:timeout)
      client = "#{service}::Stub".constantize.new(@opts[:hostname], build_ssl_credentials, **client_options)
      super(client)
    end

    ##
    # Call the client's method with given params
    #
    # @param [String|Symbol] request_method The method that is being requested on the service
    # @param [Hash] params (Optional) A hash of parameters that will be inserted into the gRPC request message that is
    # required for the given above call
    # @param [Hash] metadata (Optional) A hash of metadata key/values that are transported with the client request
    # @param [Hash] opts (Optional) A hash of options to send to the gRPC request_response method
    # @return [Gruf::Response] The response from the server
    # @raise [Gruf::Client::Error|GRPC::BadStatus] If an error occurs, an exception will be raised according to the
    # error type that was returned
    #
    def call(request_method, params = {}, metadata = {}, opts = {}, &block)
      request_method = request_method.to_sym
      req = if params.respond_to?(:to_proto) || streaming_request?(request_method)
              params
            else
              request_object(request_method, params)
            end
      md = build_metadata(metadata)
      call_sig = call_signature(request_method)

      unless call_sig
        raise NotImplementedError, "The method #{request_method} has not been implemented in this service."
      end

      resp, operation = execute(call_sig, req, md, opts, &block)

      raise @error_factory.from_exception(resp.result) unless resp.success?

      Gruf::Response.new(operation: operation, message: resp.result, execution_time: resp.time)
    end

    ##
    # Returns the currently set timeout on the client stub
    #
    # @return [Integer|NilClass]
    #
    def timeout
      __getobj__.instance_variable_get(:@timeout)
    end

    private

    ##
    # @param [Symbol] request_method
    # @return [Boolean]
    #
    def streaming_request?(request_method)
      desc = rpc_desc(request_method)
      desc && (desc.client_streamer? || desc.bidi_streamer?)
    end

    ##
    # Execute the given request to the service
    #
    # @param [Symbol] call_sig The call signature being executed
    # @param [Object] req (Optional) The protobuf request message to send
    # @param [Hash] metadata (Optional) A hash of metadata key/values that are transported with the client request
    # @param [Hash] opts (Optional) A hash of options to send to the gRPC request_response method
    # @return [Array<Gruf::Timer::Result, GRPC::ActiveCall::Operation>]
    #
    def execute(call_sig, req, metadata, opts = {}, &block)
      operation = nil
      result = Gruf::Timer.time do
        opts[:return_op] = true
        opts[:metadata] = metadata
        operation = send(call_sig, req, opts, &block)
        operation.execute
      end
      [result, operation]
    end

    ##
    # Get the appropriate RPC descriptor given the method on the service being called
    #
    # @param [Symbol] request_method The method name being called on the remote service
    # @return [Struct<GRPC::RpcDesc>] Return the given RPC descriptor given the method on the service being called
    #
    def rpc_desc(request_method)
      service_klass.rpc_descs[request_method]
    end

    ##
    # Get the appropriate protobuf request message for the given request method on the service being called
    #
    # @param [Symbol] request_method The method name being called on the remote service
    # @param [Hash] params (Optional) A hash of parameters that will populate the request object
    # @return [Class] The request object that corresponds to the method being called
    #
    def request_object(request_method, params = {})
      desc = rpc_desc(request_method)
      desc&.input ? desc.input.new(params) : nil
    end

    ##
    # Properly find the appropriate call signature for the GRPC::GenericService given the request method name
    #
    # @return [Symbol]
    #
    def call_signature(request_method)
      desc = rpc_desc(request_method)
      desc&.name ? desc.name.to_s.underscore.to_sym : nil
    end

    ##
    # Build a sanitized, authenticated metadata hash for the given request
    #
    # @param [Hash] metadata A base metadata hash to build from
    # @return [Hash] The compiled metadata hash that is ready to be transported over the wire
    #
    def build_metadata(metadata = {})
      unless opts[:password].empty?
        username = opts.fetch(:username, 'grpc').to_s
        username = username.empty? ? '' : "#{username}:"
        auth_string = Base64.encode64("#{username}#{opts[:password]}")
        metadata[:authorization] = "Basic #{auth_string}".tr("\n", '')
      end
      metadata
    end

    ##
    # Build the SSL/TLS credentials for the outbound gRPC request
    #
    # @return [Symbol|GRPC::Core::ChannelCredentials] The generated SSL credentials for the outbound gRPC request
    #
    # :nocov:
    def build_ssl_credentials
      return opts[:channel_credentials] if opts[:channel_credentials]

      cert = nil
      if opts[:ssl_certificate_file]
        cert = File.read(opts[:ssl_certificate_file]).to_s.strip
      elsif opts[:ssl_certificate]
        cert = opts[:ssl_certificate].to_s.strip
      end

      cert ? GRPC::Core::ChannelCredentials.new(cert) : :this_channel_is_insecure
    end
    # :nocov:

    ##
    # Return the specified error deserializer class by the configuration
    #
    # @return [Class] The proper error deserializer class. Defaults to JSON.
    #
    def error_deserializer_class
      if Gruf.error_serializer
        Gruf.error_serializer.is_a?(Class) ? Gruf.error_serializer : Gruf.error_serializer.to_s.constantize
      else
        Gruf::Serializers::Errors::Json
      end
    end

    ##
    # Handle various timeout values and prevent improper value setting
    #
    # @see GRPC::Core::TimeConsts#from_relative_time
    # @param [mixed] timeout
    # @return [Float]
    # @return [GRPC::Core::TimeSpec]
    #
    def parse_timeout(timeout)
      if timeout.nil?
        GRPC::Core::TimeConsts::ZERO
      elsif timeout.is_a?(GRPC::Core::TimeSpec)
        timeout
      elsif timeout.is_a?(Numeric) # rubocop:disable Lint/DuplicateBranch
        timeout
      elsif timeout.respond_to?(:to_f)
        timeout.to_f
      else
        raise ArgumentError, 'timeout is not a valid value: does not respond to to_f'
      end
    end
  end
end