mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/server/push_monitor.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Server

    # A monitor utilizing server-pushed hello requests.
    #
    # When a Monitor handshakes with a 4.4+ server, it creates an instance
    # of PushMonitor. PushMonitor subsequently executes server-pushed hello
    # (i.e. awaited & exhausted hello) to receive topology changes from the
    # server as quickly as possible. The Monitor still monitors the server
    # for round-trip time calculations and to perform immediate checks as
    # requested by the application.
    #
    # @api private
    class PushMonitor
      extend Forwardable
      include BackgroundThread

      def initialize(monitor, topology_version, monitoring, **options)
        if topology_version.nil?
          raise ArgumentError, 'Topology version must be provided but it was nil'
        end
        unless options[:app_metadata]
          raise ArgumentError, 'App metadata is required'
        end
        unless options[:check_document]
          raise ArgumentError, 'Check document is required'
        end
        @app_metadata = options[:app_metadata]
        @check_document = options[:check_document]
        @monitor = monitor
        @topology_version = topology_version
        @monitoring = monitoring
        @options = options
        @lock = Mutex.new
      end

      # @return [ Monitor ] The monitor to which this push monitor is attached.
      attr_reader :monitor

      # @return [ TopologyVersion ] Most recently received topology version.
      attr_reader :topology_version

      # @return [ Monitoring ] monitoring The monitoring.
      attr_reader :monitoring

      # @return [ Hash ] Push monitor options.
      attr_reader :options

      # @return [ Server ] The server that is being monitored.
      def_delegator :monitor, :server

      def start!
        @lock.synchronize do
          super
        end
      end

      def stop!
        @lock.synchronize do
          @stop_requested = true
          if @connection
            # Interrupt any in-progress exhausted hello reads by
            # disconnecting the connection.
            @connection.send(:socket).close rescue nil
          end
        end
        super.tap do
          @lock.synchronize do
            if @connection
              @connection.disconnect!
              @connection = nil
            end
          end
        end
      end

      def do_work
        @lock.synchronize do
          return if @stop_requested
        end

        result = monitoring.publish_heartbeat(server, awaited: true) do
          check
        end
        new_description = monitor.run_sdam_flow(result, awaited: true)
        # When hello fails due to a fail point, the response does not
        # include topology version. In this case we need to keep our existing
        # topology version so that we can resume monitoring.
        # The spec does not appear to directly address this case but
        # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-ismaster
        # says that topologyVersion should only be updated from successful
        # hello responses.
        if new_description.topology_version
          @topology_version = new_description.topology_version
        end
      rescue IOError, SocketError, SystemCallError, Mongo::Error => exc
        stop_requested = @lock.synchronize { @stop_requested }
        if stop_requested
          # Ignore the exception, see RUBY-2771.
          return
        end

        msg = "Error running awaited hello on #{server.address}"
        Utils.warn_bg_exception(msg, exc,
          logger: options[:logger],
          log_prefix: options[:log_prefix],
          bg_error_backtrace: options[:bg_error_backtrace],
        )

        # If a request failed on a connection, stop push monitoring.
        # In case the server is dead we don't want to have two connections
        # trying to connect unsuccessfully at the same time.
        stop!

        # Request an immediate check on the monitor to get reinstated as
        # soon as possible in case the server is actually alive.
        server.scan_semaphore.signal
      end

      def check
        @lock.synchronize do
          if @connection && @connection.pid != Process.pid
            log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
            @connection.disconnect!
            @connection = nil
          end
        end

        @lock.synchronize do
          unless @connection
            @server_pushing = false
            connection = PushMonitor::Connection.new(server.address, options)
            connection.connect!
            @connection = connection
          end
        end

        resp_msg = begin
          unless @server_pushing
            write_check_command
          end
          read_response
        rescue Mongo::Error
          @lock.synchronize do
            @connection.disconnect!
            @connection = nil
          end
          raise
        end
        @server_pushing = resp_msg.flags.include?(:more_to_come)
        result = Operation::Result.new(resp_msg)
        result.validate!
        result.documents.first
      end

      def write_check_command
        document = @check_document.merge(
          topologyVersion: topology_version.to_doc,
          maxAwaitTimeMS: monitor.heartbeat_interval * 1000,
        )
        command = Protocol::Msg.new(
          [:exhaust_allowed], {}, document.merge({'$db' => Database::ADMIN})
        )
        @lock.synchronize { @connection }.write_bytes(command.serialize.to_s)
      end

      def read_response
        if timeout = options[:connect_timeout]
          if timeout < 0
            raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}"
          elsif timeout > 0
            timeout += options[:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL
          end
        end
        # We set the timeout twice: once passed into read_socket which applies
        # to each individual read operation, and again around the entire read.
        Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited hello response in #{timeout} seconds") do
          @lock.synchronize { @connection }.read_response(socket_timeout: timeout)
        end
      end

      def to_s
        "#<#{self.class.name}:#{object_id} #{server.address}>"
      end

    end
  end
end

require 'mongo/server/push_monitor/connection'