bigcommerce/gruf

View on GitHub
lib/gruf/server.rb

Summary

Maintainability
A
0 mins
Test Coverage
A
94%
# frozen_string_literal: true

# Copyright (c) 2017-present, BigCommerce Pty. Ltd. All rights reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
module Gruf
  ##
  # Represents a gRPC server. Automatically loads and augments gRPC handlers and services
  # based on configuration values.
  #
  class Server
    class ServerAlreadyStartedError < StandardError; end

    KILL_SIGNALS = %w[INT TERM QUIT].freeze

    include Gruf::Loggable

    # @!attribute [r] port
    #   @return [Integer] The port the server is bound to
    attr_reader :port
    # @!attribute [r] options
    #   @return [Hash] Hash of options passed into the server
    attr_reader :options

    ##
    # Initialize the server and load and setup the services
    #
    # @param [Hash] opts
    #
    def initialize(opts = {})
      @options = opts || {}
      @interceptors = opts.fetch(:interceptor_registry, Gruf.interceptors)
      @interceptors = Gruf::Interceptors::Registry.new unless @interceptors.is_a?(Gruf::Interceptors::Registry)
      @services = nil
      @started = false
      @hostname = opts.fetch(:hostname, Gruf.server_binding_url)
      @event_listener_proc = opts.fetch(:event_listener_proc, Gruf.event_listener_proc)
    end

    ##
    # @return [GRPC::RpcServer] The GRPC server running
    #
    def server
      server_mutex do
        @server ||= begin
          # For backward compatibility, we allow these options to be passed directly
          # in the Gruf::Server options, or via Gruf.rpc_server_options.
          server_options = {
            pool_size: options.fetch(:pool_size, Gruf.rpc_server_options[:pool_size]),
            max_waiting_requests: options.fetch(:max_waiting_requests, Gruf.rpc_server_options[:max_waiting_requests]),
            poll_period: options.fetch(:poll_period, Gruf.rpc_server_options[:poll_period]),
            pool_keep_alive: options.fetch(:pool_keep_alive, Gruf.rpc_server_options[:pool_keep_alive]),
            connect_md_proc: options.fetch(:connect_md_proc, Gruf.rpc_server_options[:connect_md_proc]),
            server_args: options.fetch(:server_args, Gruf.rpc_server_options[:server_args])
          }

          server = if @event_listener_proc
                     server_options[:event_listener_proc] = @event_listener_proc
                     Gruf::InstrumentableGrpcServer.new(**server_options)
                   else
                     GRPC::RpcServer.new(**server_options)
                   end

          @port = server.add_http2_port(@hostname, ssl_credentials)
          # do not reference `services` any earlier than this method, as it allows autoloading to take effect
          # and load services into `Gruf.services` as late as possible, which gives us flexibility with different
          # execution paths (such as vanilla ruby, grape, multiple Rails versions, etc). The autoloaders are
          # initially loaded in `Gruf::Cli::Executor` _directly_ before the gRPC services are loaded into the gRPC
          # server, to allow for loading services as late as possible in the execution chain.
          services.each { |s| server.handle(s) }
          server
        end
      end
    end

    ##
    # Start the gRPC server
    #
    # :nocov:
    def start!
      update_proc_title(:starting)

      server_thread = Thread.new do
        logger.info { "[gruf] Starting gruf server at #{@hostname}..." }
        server.run_till_terminated_or_interrupted(KILL_SIGNALS)
      end
      @started = true
      update_proc_title(:serving)
      server_thread.join
      @started = false

      update_proc_title(:stopped)
      logger.info { '[gruf] Goodbye!' }
    end
    # :nocov:

    ##
    # Add a gRPC service stub to be served by gruf
    #
    # @param [Class] klass
    # @raise [ServerAlreadyStartedError] if the server is already started
    #
    def add_service(klass)
      raise ServerAlreadyStartedError if @started

      @services << klass unless services.include?(klass)
    end

    ##
    # Add an interceptor to the server
    #
    # @param [Class] klass The Interceptor to add to the registry
    # @param [Hash] opts A hash of options for the interceptor
    # @raise [ServerAlreadyStartedError] if the server is already started
    #
    def add_interceptor(klass, opts = {})
      raise ServerAlreadyStartedError if @started

      @interceptors.use(klass, opts)
    end

    ##
    # Insert an interceptor before another in the currently registered order of execution
    #
    # @param [Class] before_class The interceptor that you want to add the new interceptor before
    # @param [Class] interceptor_class The Interceptor to add to the registry
    # @param [Hash] opts A hash of options for the interceptor
    #
    def insert_interceptor_before(before_class, interceptor_class, opts = {})
      raise ServerAlreadyStartedError if @started

      @interceptors.insert_before(before_class, interceptor_class, opts)
    end

    ##
    # Insert an interceptor after another in the currently registered order of execution
    #
    # @param [Class] after_class The interceptor that you want to add the new interceptor after
    # @param [Class] interceptor_class The Interceptor to add to the registry
    # @param [Hash] opts A hash of options for the interceptor
    #
    def insert_interceptor_after(after_class, interceptor_class, opts = {})
      raise ServerAlreadyStartedError if @started

      @interceptors.insert_after(after_class, interceptor_class, opts)
    end

    ##
    # Return the current list of added interceptor classes
    #
    # @return [Array<Class>]
    #
    def list_interceptors
      @interceptors.list
    end

    ##
    # Remove an interceptor from the server
    #
    # @param [Class] klass
    #
    def remove_interceptor(klass)
      raise ServerAlreadyStartedError if @started

      @interceptors.remove(klass)
    end

    ##
    # Clear the interceptor registry of interceptors
    #
    def clear_interceptors
      raise ServerAlreadyStartedError if @started

      @interceptors.clear
    end

    private

    ##
    # @return [Array<Class>]
    #
    def services
      @services ||= ::Gruf.services || (options.fetch(:services, nil) || [])
    end

    ##
    # @param [String]
    #
    def controllers_path
      options.fetch(:controllers_path, Gruf.controllers_path)
    end

    ##
    # Load the SSL/TLS credentials for this server
    #
    # @return [GRPC::Core::ServerCredentials|Symbol]
    #
    # :nocov:
    def ssl_credentials
      return :this_port_is_insecure unless options.fetch(:use_ssl, Gruf.use_ssl)

      private_key = File.read(options.fetch(:ssl_key_file, Gruf.ssl_key_file))
      cert_chain = File.read(options.fetch(:ssl_crt_file, Gruf.ssl_crt_file))
      certs = [nil, [{ private_key: private_key, cert_chain: cert_chain }], false]
      GRPC::Core::ServerCredentials.new(*certs)
    end
    # :nocov:

    ##
    # Updates proc name/title
    #
    # @param [Symbol] state
    #
    # :nocov:
    def update_proc_title(state)
      Process.setproctitle("gruf #{Gruf::VERSION} -- #{state}")
    end
    # :nocov:
    #

    ##
    # Handle thread-safe access to the server
    #
    def server_mutex(&block)
      @server_mutex ||= begin
        require 'monitor'
        Monitor.new
      end
      @server_mutex.synchronize(&block)
    end
  end
end