lib/moped/cluster.rb
# encoding: utf-8
require "moped/node"
module Moped
# The cluster represents a cluster of MongoDB server nodes, either a single
# node, a replica set, or a mongos server.
#
# @since 1.0.0
class Cluster
# The default interval that a node would be flagged as "down".
#
# @since 2.0.0
DOWN_INTERVAL = 30
# The default interval that a node should be refreshed in.
#
# @since 2.0.0
REFRESH_INTERVAL = 300
# The default time to wait to retry an operation.
#
# @since 2.0.0
RETRY_INTERVAL = 0.25
# @!attribute options
# @return [ Hash ] The refresh options.
# @!attribute peers
# @return [ Array<Node> ] The node peers.
# @!attribute seeds
# @return [ Array<Node> ] The seed nodes.
attr_reader :options, :peers, :seeds
# Add a credential to the cluster
#
# @example Get the applied credentials.
# node.credentials
#
# @return [ Boolean ] true
#
# @since 2.0.0
def add_credential(db, username, password)
@credentials ||= {}
@credentials[db] = [ username, password ]
apply_credentials
end
# Remove a credential from the cluster
#
# @example Get the applied credentials.
# node.delete_credential(database_name)
#
# @return [ Boolean ] true
#
# @since 2.0.0
def delete_credential(db)
return true unless @credentials
@credentials.delete(db)
apply_credentials
end
# Disconnects all nodes in the cluster. This should only be used in cases
# where you know you're not going to use the cluster on the thread anymore
# and need to force the connections to close.
#
# @return [ true ] True if the disconnect succeeded.
#
# @since 1.2.0
def disconnect
nodes.each { |node| node.disconnect } and true
end
# Get the interval at which a node should be flagged as down before
# retrying.
#
# @example Get the down interval, in seconds.
# cluster.down_interval
#
# @return [ Integer ] The down interval.
#
# @since 1.2.7
def down_interval
@down_interval ||= options[:down_interval] || DOWN_INTERVAL
end
# Initialize the new cluster.
#
# @example Initialize the cluster.
# Cluster.new([ "localhost:27017" ], down_interval: 20)
#
# @param [ Hash ] options The cluster options.
#
# @option options :down_interval number of seconds to wait before attempting
# to reconnect to a down node. (30)
# @option options :refresh_interval number of seconds to cache information
# about a node. (300)
# @option options [ Integer ] :timeout The time in seconds to wait for an
# operation to timeout. (5)
#
# @since 1.0.0
def initialize(hosts, options)
@seeds = hosts.map{ |host| Node.new(host, options) }
@peers = []
@options = options
end
# Provide a pretty string for cluster inspection.
#
# @example Inspect the cluster.
# cluster.inspect
#
# @return [ String ] A nicely formatted string.
#
# @since 1.0.0
def inspect
"#<#{self.class.name}:#{object_id} @seeds=#{seeds.inspect}>"
end
# Get the number of times an operation should be retried before raising an
# error.
#
# @example Get the maximum retries.
# cluster.max_retries
#
# @return [ Integer ] The max retries.
#
# @since 1.2.7
def max_retries
@max_retries ||= options[:max_retries] || seeds.size
end
# Returns the list of available nodes, refreshing 1) any nodes which were
# down and ready to be checked again and 2) any nodes whose information is
# out of date. Arbiter nodes are not returned.
#
# @example Get the available nodes.
# cluster.nodes
#
# @return [ Array<Node> ] the list of available nodes.
#
# @since 1.0.0
def nodes
# Find the nodes that were down but are ready to be refreshed, or those
# with stale connection information.
needs_refresh, available = seeds.partition do |node|
refreshable?(node)
end
# Refresh those nodes.
available.concat(refresh(needs_refresh))
# Now return all the nodes that are available and participating in the
# replica set.
available.reject{ |node| node.down? }
end
# Refreshes information for each of the nodes provided. The node list
# defaults to the list of all known nodes.
#
# If a node is successfully refreshed, any newly discovered peers will also
# be refreshed.
#
# @example Refresh the nodes.
# cluster.refresh
#
# @param [ Array<Node> ] nodes_to_refresh The nodes to refresh.
#
# @return [ Array<Node> ] the available nodes
#
# @since 1.0.0
def refresh(nodes_to_refresh = seeds)
refreshed_nodes = []
seen = {}
# Set up a recursive lambda function for refreshing a node and it's peers.
refresh_node = ->(node) do
unless node.address.resolved
begin
node.refresh
rescue Errors::ConnectionFailure
end
end
unless seen[node] || !node.address.resolved
seen[node] = true
# Add the node to the global list of known nodes.
seeds.push(node) unless seeds.include?(node)
begin
node.refresh
# This node is good, so add it to the list of nodes to return.
refreshed_nodes.push(node) unless refreshed_nodes.include?(node)
# Now refresh any newly discovered peer nodes - this will also
# remove nodes that are not included in the peer list.
refresh_peers(node, &refresh_node)
rescue Errors::ConnectionFailure
# We couldn't connect to the node.
end
end
end
nodes_to_refresh.each(&refresh_node)
refreshed_nodes
end
# Get the interval in which the node list should be refreshed.
#
# @example Get the refresh interval, in seconds.
# cluster.refresh_interval
#
# @return [ Integer ] The refresh interval.
#
# @since 1.2.7
def refresh_interval
@refresh_interval ||= options[:refresh_interval] || REFRESH_INTERVAL
end
# Get the operation retry interval - the time to wait before retrying a
# single operation.
#
# @example Get the retry interval, in seconds.
# cluster.retry_interval
#
# @return [ Integer ] The retry interval.
#
# @since 1.2.7
def retry_interval
@retry_interval ||= options[:retry_interval] || RETRY_INTERVAL
end
# Yields the replica set's primary node to the provided block. This method
# will retry the block in case of connection errors or replica set
# reconfiguration.
#
# @example Yield the primary to the block.
# cluster.with_primary do |node|
# # ...
# end
#
# @param [ Integer ] retries The number of times to retry.
#
# @raises [ ConnectionFailure ] When no primary node can be found
#
# @return [ Object ] The result of the yield.
#
# @since 1.0.0
def with_primary(&block)
if node = nodes.find(&:primary?)
begin
node.ensure_primary do
return yield(node)
end
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
end
end
raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}"
end
# Yields a secondary node if available, otherwise the primary node. This
# method will retry the block in case of connection errors.
#
# @example Yield the secondary to the block.
# cluster.with_secondary do |node|
# # ...
# end
#
# @param [ Integer ] retries The number of times to retry.
#
# @raises [ ConnectionFailure ] When no primary node can be found
#
# @return [ Object ] The result of the yield.
#
# @since 1.0.0
def with_secondary(&block)
available_nodes = available_secondary_nodes
while node = available_nodes.shift
begin
return yield(node)
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured => e
next
end
end
raise Errors::ConnectionFailure, "Could not connect to a secondary node for replica set #{inspect}"
end
private
def available_secondary_nodes
nodes.select(&:secondary?).shuffle!
end
# Apply the credentials on all nodes
#
# @api private
#
# @example Apply the credentials.
# cluster.apply_credentials
#
# @return [ Boolean ] True
#
# @since 2.0.0
def apply_credentials
seeds.each do |node|
node.credentials = @credentials || {}
end
true
end
# Get the boundary where a node that is down would need to be refreshed.
#
# @api private
#
# @example Get the down boundary.
# cluster.down_boundary
#
# @return [ Time ] The down boundary.
#
# @since 2.0.0
def down_boundary
Time.new - down_interval
end
# Get the standard refresh boundary to discover new nodes.
#
# @api private
#
# @example Get the refresh boundary.
# cluster.refresh_boundary
#
# @return [ Time ] The refresh boundary.
#
# @since 2.0.0
def refresh_boundary
Time.new - refresh_interval
end
# Is the provided node refreshable? This is in the case where the refresh
# boundary has passed, or the node has been down longer than the down
# boundary.
#
# @api private
#
# @example Is the node refreshable?
# cluster.refreshable?(node)
#
# @param [ Node ] node The Node to check.
#
# @since 2.0.0
def refreshable?(node)
return false if node.arbiter?
node.down? ? node.down_at < down_boundary : node.needs_refresh?(refresh_boundary)
end
# Creating a cloned cluster requires cloning all the seed nodes.
#
# @api prviate
#
# @example Clone the cluster.
# cluster.clone
#
# @return [ Cluster ] The cloned cluster.
#
# @since 1.0.0
def initialize_copy(_)
@seeds = seeds.map(&:dup)
end
# Refresh the peers based on the node's peers.
#
# @api private
#
# @example Refresh the peers.
# cluster.refresh_peers(node)
#
# @param [ Node ] node The node to refresh the peers for.
#
# @since 1.0.0
def refresh_peers(node, &block)
node.peers.each do |node|
if node.address.resolved
block.call(node) unless seeds.include?(node)
peers.push(node) unless peers.include?(node)
end
end
end
end
end