mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/server/connection.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Server

    # This class models the socket connections for servers and their behavior.
    #
    # @since 2.0.0
    class Connection < ConnectionBase
      include Monitoring::Publishable
      include Retryable
      include Id
      extend Forwardable

      # The ping command.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING = { :ping => 1 }.freeze

      # The ping command for an OP_MSG (server versions >= 3.6).
      #
      # @since 2.5.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_OP_MSG = { :ping => 1, '$db' => Database::ADMIN }.freeze

      # Ping message.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_MESSAGE = Protocol::Query.new(Database::ADMIN, Database::COMMAND, PING, :limit => -1)

      # Ping message as an OP_MSG (server versions >= 3.6).
      #
      # @since 2.5.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_OP_MSG_MESSAGE = Protocol::Msg.new([], {}, PING_OP_MSG)

      # The ping message as raw bytes.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_BYTES = PING_MESSAGE.serialize.to_s.freeze

      # The ping OP_MSG message as raw bytes (server versions >= 3.6).
      #
      # @since 2.5.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_OP_MSG_BYTES = PING_OP_MSG_MESSAGE.serialize.to_s.freeze

      # Creates a new connection object to the specified target address
      # with the specified options.
      #
      # The constructor does not perform any I/O (and thus does not create
      # sockets, handshakes nor authenticates); call connect! method on the
      # connection object to create the network connection.
      #
      # @api private
      #
      # @example Create the connection.
      #   Connection.new(server)
      #
      # @note Connection must never be directly instantiated outside of a
      #   Server.
      #
      # @param [ Mongo::Server ] server The server the connection is for.
      # @param [ Hash ] options The connection options.
      #
      # @option options :pipe [ IO ] The file descriptor for the read end of the
      #   pipe to listen on during the select system call when reading from the
      #   socket.
      # @option options [ Integer ] :generation The generation of this
      #   connection. The generation should only be specified in this option
      #   when not in load-balancing mode, and it should be the generation
      #   of the connection pool when the connection is created. In
      #   load-balancing mode, the generation is set on the connection
      #   after the handshake completes.
      # @option options [ Hash ] :server_api The requested server API version.
      #   This hash can have the following items:
      #   - *:version* -- string
      #   - *:strict* -- boolean
      #   - *:deprecation_errors* -- boolean
      #
      # @since 2.0.0
      def initialize(server, options = {})
        if server.load_balancer? && options[:generation]
          raise ArgumentError, "Generation cannot be set when server is a load balancer"
        end

        @id = server.next_connection_id
        @global_id = self.class.next_id
        @monitoring = server.monitoring
        @options = options.freeze
        @server = server
        @socket = nil
        @last_checkin = nil
        @auth_mechanism = nil
        @pid = Process.pid
        @pinned = false

        publish_cmap_event(
          Monitoring::Event::Cmap::ConnectionCreated.new(address, id)
        )
      end

      # @return [ Time ] The last time the connection was checked back into a pool.
      #
      # @since 2.5.0
      attr_reader :last_checkin

      # @return [ Integer ] The ID for the connection. This will be unique
      # across connections to the same server object.
      #
      # @since 2.9.0
      attr_reader :id

      # @return [ Integer ] The global ID for the connection. This will be unique
      # across all connections.
      attr_reader :global_id

      # The connection pool from which this connection was created.
      # May be nil.
      #
      # @api private
      def connection_pool
        options[:connection_pool]
      end

      # Whether the connection was connected and was not interrupted, closed,
      # or had an error raised.
      #
      # @return [ true | false ] if the connection was connected.
      def connected?
        !closed? && !error? && !interrupted? && !!@socket
      end

      # Whether the connection was closed.
      #
      # Closed connections should no longer be used. Instead obtain a new
      # connection from the connection pool.
      #
      # @return [ true | false ] Whether connection was closed.
      #
      # @since 2.9.0
      def closed?
        !!@closed
      end

      # Whether the connection was interrupted.
      #
      # Interrupted connections were already removed from the pool and should
      # not be checked back into the pool.
      #
      # @return [ true | false ] Whether connection was interrupted.
      def interrupted?
        !!@interrupted
      end

      # Mark the connection as interrupted.
      def interrupted!
        @interrupted = true
      end

      # @api private
      def error?
        !!@error
      end

      # Whether the connection is used by a transaction or cursor operations.
      #
      # Pinned connections should not be disconnected and removed from a
      # connection pool if they are idle or stale.
      #
      # # @return [ true | false ] Whether connection is pinned.
      #
      # @api private
      def pinned?
        @pinned
      end

      # Mark the connection as pinned.
      #
      # @api private
      def pin
        @pinned = true
      end

      # Mark the connection as not pinned.
      #
      # @api private
      def unpin
        @pinned = false
      end

      # Establishes a network connection to the target address.
      #
      # If the connection is already established, this method does nothing.
      #
      # @example Connect to the host.
      #   connection.connect!
      #
      # @note This method mutates the connection object by setting a socket if
      #   one previously did not exist.
      #
      # @return [ true ] If the connection succeeded.
      #
      # @since 2.0.0
      def connect!
        raise_if_closed!

        unless @socket
          @socket = create_socket
          @description, @compressor = do_connect

          if server.load_balancer?
            if Lint.enabled?
              unless service_id
                raise Error::InternalDriverError, "The connection is to a load balancer and it must have service_id set here, but does not"
              end
            end
            @generation = connection_pool.generation_manager.generation(service_id: service_id)
          end

          publish_cmap_event(
            Monitoring::Event::Cmap::ConnectionReady.new(address, id)
          )

          @close_event_published = false
        end
        true
      end

      # Creates the socket. The method is separate from do_connect, so that
      # pending connections can be closed if they are interrupted during hello.
      #
      #
      # @return [ Socket ] The created socket.
      private def create_socket
        add_server_diagnostics do
          address.socket(socket_timeout, ssl_options.merge(
            connection_address: address, connection_generation: generation, pipe: options[:pipe]))
        end
      end

      # Separate method to permit easier mocking in the test suite.
      #
      # @return [ Array<Server::Description, String | Symbol> ] A server
      #   description instance from the hello response of the returned socket
      #   and the compressor to use.
      private def do_connect
        raise_if_closed!
        begin
          pending_connection = PendingConnection.new(
            socket, @server, monitoring, options.merge(id: id))
          pending_connection.handshake_and_authenticate!
        rescue Exception
          socket&.close
          @socket = nil
          raise
        end

        [pending_connection.description, pending_connection.compressor]
      end

      # Disconnect the connection.
      #
      # @note Once a connection is disconnected, it should no longer be used.
      #   A new connection should be obtained from the connection pool which
      #   will either return a ready connection or create a new connection.
      #   If linting is enabled, reusing a disconnected connection will raise
      #   Error::LintError. If linting is not enabled, a warning will be logged.
      #
      # @note This method mutates the connection object by setting the socket
      #   to nil if the closing succeeded.
      #
      # @option options [ Symbol ] :reason The reason why the connection is
      #   being closed.
      # @option options [ true | false ] :interrupted Whether or not the
      #   connection was interrupted.
      #
      # @return [ true ] If the disconnect succeeded.
      #
      # @since 2.0.0
      def disconnect!(options = nil)
        # Note: @closed may be true here but we also may have a socket.
        # Check the socket and not @closed flag.
        @auth_mechanism = nil
        @last_checkin = nil
        if socket
          socket.close rescue nil
          @socket = nil
        end
        @closed = true
        interrupted! if options && options[:interrupted]

        # To satisfy CMAP spec tests, publish close events even if the
        # socket was never connected (and thus the ready event was never
        # published). But track whether we published close event and do not
        # publish it multiple times, unless the socket was reconnected -
        # in that case publish the close event once per socket close.
        unless @close_event_published
          reason = options && options[:reason]
          publish_cmap_event(
            Monitoring::Event::Cmap::ConnectionClosed.new(
              address,
              id,
              reason,
            ),
          )
          @close_event_published = true
        end

        true
      end

      # Ping the connection to see if the server is responding to commands.
      # This is non-blocking on the server side.
      #
      # @example Ping the connection.
      #   connection.ping
      #
      # @note This uses a pre-serialized ping message for optimization.
      #
      # @return [ true, false ] If the server is accepting connections.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      def ping
        bytes = features.op_msg_enabled? ? PING_OP_MSG_BYTES : PING_BYTES
        ensure_connected do |socket|
          reply = add_server_diagnostics do
            socket.write(bytes)
            Protocol::Message.deserialize(socket, max_message_size)
          end
          reply.documents[0][Operation::Result::OK] == 1
        end
      end

      # Get the timeout to execute an operation on a socket.
      #
      # @return [ Float ] The operation timeout in seconds.
      #
      # @since 2.0.0
      def socket_timeout
        @timeout ||= options[:socket_timeout]
      end
      # @deprecated Please use :socket_timeout instead. Will be removed in 3.0.0
      alias :timeout :socket_timeout

      # Record the last checkin time.
      #
      # @example Record the checkin time on this connection.
      #   connection.record_checkin!
      #
      # @return [ self ]
      #
      # @since 2.5.0
      def record_checkin!
        @last_checkin = Time.now
        self
      end

      private

      def deliver(message, client, options = {})
        handle_errors do
          super
        end
      end

      def handle_errors
        begin
          yield
        rescue Error::SocketError => e
          @error = e
          @server.unknown!(
            generation: e.generation,
            # or description.service_id?
            service_id: e.service_id,
            stop_push_monitor: true,
          )
          raise
        rescue Error::SocketTimeoutError => e
          @error = e
          raise
        end
      end

      def raise_if_closed!
        if error?
          raise Error::ConnectionPerished, "Connection #{generation}:#{id} for #{address.seed} is perished. Reconnecting closed or errored connections is no longer supported"
        end

        if closed?
          raise Error::ConnectionPerished, "Connection #{generation}:#{id} for #{address.seed} is closed. Reconnecting closed or errored connections is no longer supported"
        end
      end
    end
  end
end