lib/mongo/server/monitor/connection.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2015-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Server
class Monitor
# This class models the monitor connections and their behavior.
#
# @since 2.0.0
# @api private
class Connection < Server::ConnectionCommon
include Loggable
# Creates a new connection object to the specified target address
# with the specified options.
#
# The constructor does not perform any I/O (and thus does not create
# sockets nor handshakes); call connect! method on the connection
# object to create the network connection.
#
# @note Monitoring connections do not authenticate.
#
# @param [ Mongo::Address ] address The address the connection is for.
# @param [ Hash ] options The connection options.
#
# @option options [ Mongo::Server::Monitor::AppMetadata ] :app_metadata
# Metadata to use for handshake. If missing or nil, handshake will
# not be performed. Although a Mongo::Server::AppMetadata instance
# will also work, monitoring connections are meant to use
# Mongo::Server::Monitor::AppMetadata instances in order to omit
# performing SCRAM negotiation with the server, as monitoring
# sockets do not authenticate.
# @option options [ Array<String> ] :compressors A list of potential
# compressors to use, in order of preference. The driver chooses the
# first compressor that is also supported by the server. Currently the
# driver only supports 'zstd', 'snappy' and 'zlib'.
# @option options [ Float ] :connect_timeout The timeout, in seconds,
# to use for network operations. This timeout is used for all
# socket operations rather than connect calls only, contrary to
# what the name implies,
#
# @since 2.0.0
def initialize(address, options = {})
@address = address
@options = options.dup.freeze
unless @app_metadata = options[:app_metadata]
raise ArgumentError, 'App metadata is required'
end
@socket = nil
@pid = Process.pid
@compressor = nil
@hello_ok = false
end
# @return [ Hash ] options The passed in options.
attr_reader :options
# @return [ Mongo::Address ] address The address to connect to.
attr_reader :address
# Returns the monitoring socket timeout.
#
# Note that monitoring connections use the connect timeout value as
# the socket timeout value. See the Server Discovery and Monitoring
# specification for details.
#
# @return [ Float ] The socket timeout in seconds.
#
# @since 2.4.3
def socket_timeout
options[:connect_timeout] || Server::CONNECT_TIMEOUT
end
# @return [ Integer ] server_connection_id The server connection id.
attr_reader :server_connection_id
# Sends a message and returns the result.
#
# @param [ Protocol::Message ] message The message to send.
#
# @return [ Protocol::Message ] The result.
def dispatch(message)
dispatch_bytes(message.serialize.to_s)
end
# Sends a preserialized message and returns the result.
#
# @param [ String ] bytes The serialized message to send.
#
# @option opts [ Numeric ] :read_socket_timeout The timeout to use for
# each read operation.
#
# @return [ Protocol::Message ] The result.
def dispatch_bytes(bytes, **opts)
write_bytes(bytes)
read_response(
socket_timeout: opts[:read_socket_timeout],
)
end
def write_bytes(bytes)
unless connected?
raise ArgumentError, "Trying to dispatch on an unconnected connection #{self}"
end
add_server_connection_id do
add_server_diagnostics do
socket.write(bytes)
end
end
end
# @option opts [ Numeric ] :socket_timeout The timeout to use for
# each read operation.
def read_response(**opts)
unless connected?
raise ArgumentError, "Trying to read on an unconnected connection #{self}"
end
add_server_connection_id do
add_server_diagnostics do
Protocol::Message.deserialize(socket,
Protocol::Message::MAX_MESSAGE_SIZE,
nil,
**opts)
end
end
end
# Establishes a network connection to the target address.
#
# If the connection is already established, this method does nothing.
#
# @example Connect to the host.
# connection.connect!
#
# @note This method mutates the connection class by setting a socket if
# one previously did not exist.
#
# @return [ true ] If the connection succeeded.
#
# @since 2.0.0
def connect!
if @socket
raise ArgumentError, 'Monitoring connection already connected'
end
@socket = add_server_diagnostics do
address.socket(socket_timeout, ssl_options.merge(
connection_address: address, monitor: true))
end
true
end
# Disconnect the connection.
#
# @example Disconnect from the host.
# connection.disconnect!
#
# @note This method mutates the connection by setting the socket to nil
# if the closing succeeded.
#
# @note This method accepts an options argument for compatibility with
# Server::Connections. However, all options are ignored.
#
# @return [ true ] If the disconnect succeeded.
#
# @since 2.0.0
def disconnect!(options = nil)
if socket
socket.close rescue nil
@socket = nil
end
true
end
# Send handshake command to connected host and validate the response.
#
# @return [BSON::Document] Handshake response from server
#
# @raise [Mongo::Error] If handshake failed.
def handshake!
command = handshake_command(
handshake_document(
@app_metadata,
server_api: options[:server_api]
)
)
payload = command.serialize.to_s
message = dispatch_bytes(payload)
result = Operation::Result.new(message)
result.validate!
reply = result.documents.first
set_compressor!(reply)
set_hello_ok!(reply)
@server_connection_id = reply['connectionId']
reply
rescue => exc
msg = "Failed to handshake with #{address}"
Utils.warn_bg_exception(msg, exc,
logger: options[:logger],
log_prefix: options[:log_prefix],
bg_error_backtrace: options[:bg_error_backtrace],
)
raise
end
# Build a document that should be used for connection check.
#
# @return [BSON::Document] Document that should be sent to a server
# for connection check.
#
# @api private
def check_document
server_api = @app_metadata.server_api || options[:server_api]
doc = if hello_ok? || server_api
_doc = HELLO_DOC
if server_api
_doc = _doc.merge(Utils.transform_server_api(server_api))
end
_doc
else
LEGACY_HELLO_DOC
end
# compressors must be set to maintain correct compression status
# in the server description. See RUBY-2427
if compressors = options[:compressors]
doc = doc.merge(compression: compressors)
end
doc
end
private
def add_server_connection_id
yield
rescue Mongo::Error => e
if server_connection_id
note = "sconn:#{server_connection_id}"
e.add_note(note)
end
raise e
end
# Update @hello_ok flag according to server reply to legacy hello
# command. The flag will be set to true if connected server supports
# hello command, otherwise the flag will be set to false.
#
# @param [ BSON::Document ] reply Server reply to legacy hello command.
def set_hello_ok!(reply)
@hello_ok = !!reply[:helloOk]
end
def hello_ok?
@hello_ok
end
end
end
end
end