lib/mongo/collection.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'mongo/bulk_write'
require 'mongo/collection/view'
require 'mongo/collection/helpers'
require 'mongo/collection/queryable_encryption'
module Mongo
# Represents a collection in the database and operations that can directly be
# applied to one.
#
# @since 2.0.0
class Collection
extend Forwardable
include Retryable
include QueryableEncryption
include Helpers
# The capped option.
#
# @since 2.1.0
CAPPED = 'capped'.freeze
# The ns field constant.
#
# @since 2.1.0
NS = 'ns'.freeze
# @return [ Mongo::Database ] The database the collection resides in.
attr_reader :database
# @return [ String ] The name of the collection.
attr_reader :name
# @return [ Hash ] The collection options.
attr_reader :options
# Get client, cluster, read preference, write concern, and encrypted_fields_map from client.
def_delegators :database, :client, :cluster, :encrypted_fields_map
# Delegate to the cluster for the next primary.
def_delegators :cluster, :next_primary
# Options that can be updated on a new Collection instance via the #with method.
#
# @since 2.1.0
CHANGEABLE_OPTIONS = [ :read, :read_concern, :write, :write_concern ].freeze
# Options map to transform create collection options.
#
# @api private
CREATE_COLLECTION_OPTIONS = {
:time_series => :timeseries,
:expire_after => :expireAfterSeconds,
:clustered_index => :clusteredIndex,
:change_stream_pre_and_post_images => :changeStreamPreAndPostImages,
:encrypted_fields => :encryptedFields,
:validator => :validator,
:view_on => :viewOn
}
# Check if a collection is equal to another object. Will check the name and
# the database for equality.
#
# @example Check collection equality.
# collection == other
#
# @param [ Object ] other The object to check.
#
# @return [ true | false ] If the objects are equal.
#
# @since 2.0.0
def ==(other)
return false unless other.is_a?(Collection)
name == other.name && database == other.database && options == other.options
end
# Instantiate a new collection.
#
# @example Instantiate a new collection.
# Mongo::Collection.new(database, 'test')
#
# @param [ Mongo::Database ] database The collection's database.
# @param [ String, Symbol ] name The collection name.
# @param [ Hash ] options The collection options.
#
# @option opts [ true | false ] :capped Create a fixed-sized collection.
# @option opts [ Hash ] :change_stream_pre_and_post_images Used to enable
# pre- and post-images on the created collection.
# The hash may have the following items:
# - *:enabled* -- true or false.
# @option opts [ Hash ] :clustered_index Create a clustered index.
# This option specifies how this collection should be clustered on _id.
# The hash may have the following items:
# - *:key* -- The clustered index key field. Must be set to { _id: 1 }.
# - *:unique* -- Must be set to true. The collection will not accept
# inserted or updated documents where the clustered index key value
# matches an existing value in the index.
# - *:name* -- Optional. A name that uniquely identifies the clustered index.
# @option opts [ Hash ] :collation The collation to use.
# @option opts [ Hash ] :encrypted_fields Hash describing encrypted fields
# for queryable encryption.
# @option opts [ Integer ] :expire_after Number indicating
# after how many seconds old time-series data should be deleted.
# @option opts [ Integer ] :max The maximum number of documents in a
# capped collection. The size limit takes precedents over max.
# @option opts [ Array<Hash> ] :pipeline An array of pipeline stages.
# A view will be created by applying this pipeline to the view_on
# collection or view.
# @option options [ Hash ] :read_concern The read concern options hash,
# with the following optional keys:
# - *:level* -- the read preference level as a symbol; valid values
# are *:local*, *:majority*, and *:snapshot*
# @option options [ Hash ] :read The read preference options.
# The hash may have the following items:
# - *:mode* -- read preference specified as a symbol; valid values are
# *:primary*, *:primary_preferred*, *:secondary*, *:secondary_preferred*
# and *:nearest*.
# - *:tag_sets* -- an array of hashes.
# - *:local_threshold*.
# @option options [ Session ] :session The session to use for the operation.
# @option options [ Integer ] :size The size of the capped collection.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the database or the client.
# @option opts [ Hash ] :time_series Create a time-series collection.
# The hash may have the following items:
# - *:timeField* -- The name of the field which contains the date in each
# time series document.
# - *:metaField* -- The name of the field which contains metadata in each
# time series document.
# - *:granularity* -- Set the granularity to the value that is the closest
# match to the time span between consecutive incoming measurements.
# Possible values are "seconds" (default), "minutes", and "hours".
# @option opts [ Hash ] :validator Hash describing document validation
# options for the collection.
# @option opts [ String ] :view_on The name of the source collection or
# view from which to create a view.
# @option opts [ Hash ] :write Deprecated. Equivalent to :write_concern
# option.
# @option opts [ Hash ] :write_concern The write concern options.
# Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
#
# @since 2.0.0
def initialize(database, name, options = {})
raise Error::InvalidCollectionName.new unless name
if options[:write] && options[:write_concern] && options[:write] != options[:write_concern]
raise ArgumentError, "If :write and :write_concern are both given, they must be identical: #{options.inspect}"
end
@database = database
@name = name.to_s.freeze
@options = options.dup
@timeout_ms = options.delete(:timeout_ms)
=begin WriteConcern object support
if @options[:write_concern].is_a?(WriteConcern::Base)
# Cache the instance so that we do not needlessly reconstruct it.
@write_concern = @options[:write_concern]
@options[:write_concern] = @write_concern.options
end
=end
@options.freeze
end
# Get the effective read concern for this collection instance.
#
# If a read concern was provided in collection options, that read concern
# will be returned, otherwise the database's effective read concern will
# be returned.
#
# @example Get the read concern.
# collection.read_concern
#
# @return [ Hash ] The read concern.
#
# @since 2.2.0
def read_concern
options[:read_concern] || database.read_concern
end
# Get the server selector for this collection.
#
# @example Get the server selector.
# collection.server_selector
#
# @return [ Mongo::ServerSelector ] The server selector.
#
# @since 2.0.0
def server_selector
@server_selector ||= ServerSelector.get(read_preference || database.server_selector)
end
# Get the effective read preference for this collection.
#
# If a read preference was provided in collection options, that read
# preference will be returned, otherwise the database's effective read
# preference will be returned.
#
# @example Get the read preference.
# collection.read_preference
#
# @return [ Hash ] The read preference.
#
# @since 2.0.0
def read_preference
@read_preference ||= options[:read] || database.read_preference
end
# Get the effective write concern on this collection.
#
# If a write concern was provided in collection options, that write
# concern will be returned, otherwise the database's effective write
# concern will be returned.
#
# @example Get the write concern.
# collection.write_concern
#
# @return [ Mongo::WriteConcern ] The write concern.
#
# @since 2.0.0
def write_concern
@write_concern ||= WriteConcern.get(
options[:write_concern] || options[:write] || database.write_concern)
end
# Get the write concern to use for an operation on this collection,
# given a session.
#
# If the session is in a transaction and the collection
# has an unacknowledged write concern, remove the write
# concern's :w option. Otherwise, return the unmodified
# write concern.
#
# @return [ Mongo::WriteConcern ] The write concern.
#
# @api private
def write_concern_with_session(session)
wc = write_concern
if session && session.in_transaction?
if wc && !wc.acknowledged?
opts = wc.options.dup
opts.delete(:w)
return WriteConcern.get(opts)
end
end
wc
end
# Provides a new collection with either a new read preference, new read
# concern or new write concern merged over the existing read preference /
# read concern / write concern.
#
# @example Get a collection with a changed read preference.
# collection.with(read: { mode: :primary_preferred })
# @example Get a collection with a changed read concern.
# collection.with(read_concern: { level: :majority })
#
# @example Get a collection with a changed write concern.
# collection.with(write_concern: { w: 3 })
#
# @param [ Hash ] new_options The new options to use.
#
# @option new_options [ Hash ] :read The read preference options.
# The hash may have the following items:
# - *:mode* -- read preference specified as a symbol; valid values are
# *:primary*, *:primary_preferred*, *:secondary*, *:secondary_preferred*
# and *:nearest*.
# - *:tag_sets* -- an array of hashes.
# - *:local_threshold*.
# @option new_options [ Hash ] :read_concern The read concern options hash,
# with the following optional keys:
# - *:level* -- the read preference level as a symbol; valid values
# are *:local*, *:majority*, and *:snapshot*
# @option new_options [ Hash ] :write Deprecated. Equivalent to :write_concern
# option.
# @option new_options [ Hash ] :write_concern The write concern options.
# Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
#
# @return [ Mongo::Collection ] A new collection instance.
#
# @since 2.1.0
def with(new_options)
new_options.keys.each do |k|
raise Error::UnchangeableCollectionOption.new(k) unless CHANGEABLE_OPTIONS.include?(k)
end
options = @options.dup
if options[:write] && new_options[:write_concern]
options.delete(:write)
end
if options[:write_concern] && new_options[:write]
options.delete(:write_concern)
end
Collection.new(database, name, options.update(new_options))
end
# Is the collection capped?
#
# @example Is the collection capped?
# collection.capped?
#
# @return [ true | false ] If the collection is capped.
#
# @since 2.0.0
def capped?
database.list_collections(filter: { name: name })
.first
&.dig('options', CAPPED) || false
end
# Force the collection to be created in the database.
#
# @example Force the collection to be created.
# collection.create
#
# @param [ Hash ] opts The options for the create operation.
#
# @option opts [ true | false ] :capped Create a fixed-sized collection.
# @option opts [ Hash ] :change_stream_pre_and_post_images Used to enable
# pre- and post-images on the created collection.
# The hash may have the following items:
# - *:enabled* -- true or false.
# @option opts [ Hash ] :clustered_index Create a clustered index.
# This option specifies how this collection should be clustered on _id.
# The hash may have the following items:
# - *:key* -- The clustered index key field. Must be set to { _id: 1 }.
# - *:unique* -- Must be set to true. The collection will not accept
# inserted or updated documents where the clustered index key value
# matches an existing value in the index.
# - *:name* -- Optional. A name that uniquely identifies the clustered index.
# @option opts [ Hash ] :collation The collation to use when creating the
# collection. This option will not be sent to the server when calling
# collection methods.
# @option opts [ Hash ] :encrypted_fields Hash describing encrypted fields
# for queryable encryption.
# @option opts [ Integer ] :expire_after Number indicating
# after how many seconds old time-series data should be deleted.
# @option opts [ Integer ] :max The maximum number of documents in a
# capped collection. The size limit takes precedents over max.
# @option opts [ Array<Hash> ] :pipeline An array of pipeline stages.
# A view will be created by applying this pipeline to the view_on
# collection or view.
# @option opts [ Session ] :session The session to use for the operation.
# @option opts [ Integer ] :size The size of the capped collection.
# @option opts [ Hash ] :time_series Create a time-series collection.
# The hash may have the following items:
# - *:timeField* -- The name of the field which contains the date in each
# time series document.
# - *:metaField* -- The name of the field which contains metadata in each
# time series document.
# - *:granularity* -- Set the granularity to the value that is the closest
# match to the time span between consecutive incoming measurements.
# Possible values are "seconds" (default), "minutes", and "hours".
# @option opts [ Hash ] :validator Hash describing document validation
# options for the collection.
# @option opts [ String ] :view_on The name of the source collection or
# view from which to create a view.
# @option opts [ Hash ] :write Deprecated. Equivalent to :write_concern
# option.
# @option opts [ Hash ] :write_concern The write concern options.
# Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
#
# @return [ Result ] The result of the command.
#
# @since 2.0.0
def create(opts = {})
# Passing read options to create command causes it to break.
# Filter the read options out. Session is also excluded here as it gets
# used by the call to with_session and should not be part of the
# operation. If it gets passed to the operation it would fail BSON
# serialization.
# TODO put the list of read options in a class-level constant when
# we figure out what the full set of them is.
options = Hash[self.options.merge(opts).reject do |key, value|
%w(read read_preference read_concern session).include?(key.to_s)
end]
# Converting Ruby options to server style.
CREATE_COLLECTION_OPTIONS.each do |ruby_key, server_key|
if options.key?(ruby_key)
options[server_key] = options.delete(ruby_key)
end
end
operation = { :create => name }.merge(options)
operation.delete(:write)
operation.delete(:write_concern)
client.send(:with_session, opts) do |session|
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
self.write_concern
end
context = Operation::Context.new(
client: client,
session: session
)
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
Operation::Create.new(
selector: operation,
db_name: database.name,
write_concern: write_concern,
session: session,
# Note that these are collection options, collation isn't
# taken from options passed to the create method.
collation: options[:collation] || options['collation'],
encrypted_fields: encrypted_fields,
validator: options[:validator],
).execute(
next_primary(nil, session),
context: context
)
end
end
end
# Drop the collection. Will also drop all indexes associated with the
# collection, as well as associated queryable encryption collections.
#
# @note An error returned if the collection doesn't exist is suppressed.
#
# @example Drop the collection.
# collection.drop
#
# @param [ Hash ] opts The options for the drop operation.
#
# @option opts [ Session ] :session The session to use for the operation.
# @option opts [ Hash ] :write_concern The write concern options.
# @option opts [ Hash | nil ] :encrypted_fields Encrypted fields hash that
# was provided to `create` collection helper.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ Result ] The result of the command.
#
# @since 2.0.0
def drop(opts = {})
client.with_session(opts) do |session|
maybe_drop_emm_collections(opts[:encrypted_fields], client, session) do
temp_write_concern = write_concern
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
temp_write_concern
end
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
operation = Operation::Drop.new({
selector: { :drop => name },
db_name: database.name,
write_concern: write_concern,
session: session,
})
do_drop(operation, session, context)
end
end
end
# Find documents in the collection.
#
# @example Find documents in the collection by a selector.
# collection.find(name: 1)
#
# @example Get all documents in a collection.
# collection.find
#
# @param [ Hash ] filter The filter to use in the find.
# @param [ Hash ] options The options for the find.
#
# @option options [ true | false ] :allow_disk_use When set to true, the
# server can write temporary data to disk while executing the find
# operation. This option is only available on MongoDB server versions
# 4.4 and newer.
# @option options [ true | false ] :allow_partial_results Allows the query to get partial
# results if some shards are down.
# @option options [ Integer ] :batch_size The number of documents returned in each batch
# of results from MongoDB.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Object ] :comment A user-provided comment to attach to
# this command.
# @option options [ :tailable, :tailable_await ] :cursor_type The type of cursor to use.
# @option options [ Integer ] :limit The max number of docs to return from the query.
# @option options [ Integer ] :max_time_ms The maximum amount of time to
# allow the query to run, in milliseconds. This option is deprecated, use
# :timeout_ms instead.
# @option options [ Hash ] :modifiers A document containing meta-operators modifying the
# output or behavior of a query.
# @option options [ true | false ] :no_cursor_timeout The server normally times out idle
# cursors after an inactivity period (10 minutes) to prevent excess memory use.
# Set this option to prevent that.
# @option options [ true | false ] :oplog_replay For internal replication
# use only, applications should not set this option.
# @option options [ Hash ] :projection The fields to include or exclude from each doc
# in the result set.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :skip The number of docs to skip before returning results.
# @option options [ Hash ] :sort The key and direction pairs by which the result set
# will be sorted.
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
# iteration).
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ CollectionView ] The collection view.
#
# @since 2.0.0
def find(filter = nil, options = {})
View.new(self, filter || {}, options)
end
# Perform an aggregation on the collection.
#
# @example Perform an aggregation.
# collection.aggregate([ { "$group" => { "_id" => "$city", "tpop" => { "$sum" => "$pop" }}} ])
#
# @param [ Array<Hash> ] pipeline The aggregation pipeline.
# @param [ Hash ] options The aggregation options.
#
# @option options [ true | false ] :allow_disk_use Set to true if disk
# usage is allowed during the aggregation.
# @option options [ Integer ] :batch_size The number of documents to return
# per batch.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ String ] :hint The index to use for the aggregation.
# @option options [ Hash ] :let Mapping of variables to use in the pipeline.
# See the server documentation for details.
# @option options [ Integer ] :max_time_ms The maximum amount of time to
# allow the query to run, in milliseconds. This option is deprecated, use
# :timeout_ms instead.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ View::Aggregation ] The aggregation object.
#
# @since 2.1.0
def aggregate(pipeline, options = {})
View.new(self, {}, options).aggregate(pipeline, options)
end
# As of version 3.6 of the MongoDB server, a ``$changeStream`` pipeline
# stage is supported in the aggregation framework. This stage allows users
# to request that notifications are sent for all changes to a particular
# collection.
#
# @example Get change notifications for a given collection.
# collection.watch([{ '$match' => { operationType: { '$in' => ['insert', 'replace'] } } }])
#
# @param [ Array<Hash> ] pipeline Optional additional filter operators.
# @param [ Hash ] options The change stream options.
#
# @option options [ String ] :full_document Allowed values: nil, 'default',
# 'updateLookup', 'whenAvailable', 'required'.
#
# The default is to not send a value (i.e. nil), which is equivalent to
# 'default'. By default, the change notification for partial updates will
# include a delta describing the changes to the document.
#
# When set to 'updateLookup', the change notification for partial updates
# will include both a delta describing the changes to the document as well
# as a copy of the entire document that was changed from some time after
# the change occurred.
#
# When set to 'whenAvailable', configures the change stream to return the
# post-image of the modified document for replace and update change events
# if the post-image for this event is available.
#
# When set to 'required', the same behavior as 'whenAvailable' except that
# an error is raised if the post-image is not available.
# @option options [ String ] :full_document_before_change Allowed values: nil,
# 'whenAvailable', 'required', 'off'.
#
# The default is to not send a value (i.e. nil), which is equivalent to 'off'.
#
# When set to 'whenAvailable', configures the change stream to return the
# pre-image of the modified document for replace, update, and delete change
# events if it is available.
#
# When set to 'required', the same behavior as 'whenAvailable' except that
# an error is raised if the pre-image is not available.
# @option options [ BSON::Document, Hash ] :resume_after Specifies the
# logical starting point for the new change stream.
# @option options [ Integer ] :max_await_time_ms The maximum amount of time
# for the server to wait on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return
# per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ BSON::Timestamp ] :start_at_operation_time Only return
# changes that occurred at or after the specified timestamp. Any command run
# against the server will return a cluster time that can be used here.
# Only recognized by server versions 4.0+.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Boolean ] :show_expanded_events Enables the server to
# send the 'expanded' list of change stream events. The list of additional
# events included with this flag set are: createIndexes, dropIndexes,
# modify, create, shardCollection, reshardCollection,
# refineCollectionShardKey.
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
# iteration).
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @note A change stream only allows 'majority' read concern.
# @note This helper method is preferable to running a raw aggregation with
# a $changeStream stage, for the purpose of supporting resumability.
#
# @return [ ChangeStream ] The change stream object.
#
# @since 2.5.0
def watch(pipeline = [], options = {})
view_options = options.dup
view_options[:cursor_type] = :tailable_await if options[:max_await_time_ms]
View::ChangeStream.new(View.new(self, {}, view_options), pipeline, nil, options)
end
# Gets an estimated number of matching documents in the collection.
#
# @example Get the count.
# collection.count(name: 1)
#
# @param [ Hash ] filter A filter for matching documents.
# @param [ Hash ] options The count options.
#
# @option options [ Hash ] :hint The index to use.
# @option options [ Integer ] :limit The maximum number of documents to count.
# @option options [ Integer ] :max_time_ms The maximum amount of time to
# allow the query to run, in milliseconds. This option is deprecated, use
# :timeout_ms instead.
# @option options [ Integer ] :skip The number of documents to skip before counting.
# @option options [ Hash ] :read The read preference options.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ Integer ] The document count.
#
# @since 2.1.0
#
# @deprecated Use #count_documents or estimated_document_count instead. However, note that the
# following operators will need to be substituted when switching to #count_documents:
# * $where should be replaced with $expr (only works on 3.6+)
# * $near should be replaced with $geoWithin with $center
# * $nearSphere should be replaced with $geoWithin with $centerSphere
def count(filter = nil, options = {})
View.new(self, filter || {}, options).count(options)
end
# Gets the number of documents matching the query. Unlike the deprecated
# #count method, this will return the exact number of documents matching
# the filter (or exact number of documents in the collection, if no filter
# is provided) rather than an estimate.
#
# Use #estimated_document_count to retrieve an estimate of the number
# of documents in the collection using the collection metadata.
#
# @param [ Hash ] filter A filter for matching documents.
# @param [ Hash ] options Options for the operation.
#
# @option options :skip [ Integer ] The number of documents to skip.
# @option options :hint [ Hash ] Override default index selection and force
# MongoDB to use a specific index for the query. Requires server version 3.6+.
# @option options :limit [ Integer ] Max number of docs to count.
# @option options :max_time_ms [ Integer ] The maximum amount of time to allow the
# command to run.
# @option options :read [ Hash ] The read preference options.
# @option options :collation [ Hash ] The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ Integer ] The document count.
#
# @since 2.6.0
def count_documents(filter = {}, options = {})
View.new(self, filter, options).count_documents(options)
end
# Gets an estimate of the number of documents in the collection using the
# collection metadata.
#
# Use #count_documents to retrieve the exact number of documents in the
# collection, or to count documents matching a filter.
#
# @param [ Hash ] options Options for the operation.
#
# @option options :max_time_ms [ Integer ] The maximum amount of time to allow
# the command to run for on the server.
# @option options [ Hash ] :read The read preference options.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ Integer ] The document count.
#
# @since 2.6.0
def estimated_document_count(options = {})
View.new(self, {}, options).estimated_document_count(options)
end
# Get a list of distinct values for a specific field.
#
# @example Get the distinct values.
# collection.distinct('name')
#
# @param [ Symbol, String ] field_name The name of the field.
# @param [ Hash ] filter The documents from which to retrieve the distinct values.
# @param [ Hash ] options The distinct command options.
#
# @option options [ Integer ] :max_time_ms The maximum amount of time to
# allow the query to run, in milliseconds. This option is deprecated, use
# :timeout_ms instead.
# @option options [ Hash ] :read The read preference options.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ Array<Object> ] The list of distinct values.
#
# @since 2.1.0
def distinct(field_name, filter = nil, options = {})
View.new(self, filter || {}, options).distinct(field_name, options)
end
# Get a view of all indexes for this collection. Can be iterated or has
# more operations.
#
# @example Get the index view.
# collection.indexes
#
# @param [ Hash ] options Options for getting a list of all indexes.
#
# @option options [ Session ] :session The session to use.
#
# @return [ Index::View ] The index view.
#
# @since 2.0.0
def indexes(options = {})
Index::View.new(self, options)
end
# Get a view of all search indexes for this collection. Can be iterated or
# operated on directly. If id or name are given, the iterator will return
# only the indicated index. For all other operations, id and name are
# ignored.
#
# @note Only one of id or name may be given; it is an error to specify both,
# although both may be omitted safely.
#
# @param [ Hash ] options The options to use to configure the view.
#
# @option options [ String ] :id The id of the specific index to query (optional)
# @option options [ String ] :name The name of the specific index to query (optional)
# @option options [ Hash ] :aggregate The options hash to pass to the
# aggregate command (optional)
#
# @return [ SearchIndex::View ] The search index view.
#
# @since 2.0.0
def search_indexes(options = {})
SearchIndex::View.new(self, options)
end
# Get a pretty printed string inspection for the collection.
#
# @example Inspect the collection.
# collection.inspect
#
# @return [ String ] The collection inspection.
#
# @since 2.0.0
def inspect
"#<Mongo::Collection:0x#{object_id} namespace=#{namespace}>"
end
# Insert a single document into the collection.
#
# @example Insert a document into the collection.
# collection.insert_one({ name: 'test' })
#
# @param [ Hash ] document The document to insert.
# @param [ Hash ] opts The insert options.
#
# @option opts [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option opts [ Object ] :comment A user-provided comment to attach to
# this command.
# @option opts [ Session ] :session The session to use for the operation.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option opts [ Hash ] :write_concern The write concern options.
# Can be :w => Integer, :fsync => Boolean, :j => Boolean.
#
# @return [ Result ] The database response wrapper.
#
# @since 2.0.0
def insert_one(document, opts = {})
QueryCache.clear_namespace(namespace)
client.with_session(opts) do |session|
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
write_concern_with_session(session)
end
if document.nil?
raise ArgumentError, "Document to be inserted cannot be nil"
end
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
Operation::Insert.new(
:documents => [ document ],
:db_name => database.name,
:coll_name => name,
:write_concern => write_concern,
:bypass_document_validation => !!opts[:bypass_document_validation],
:options => opts,
:id_generator => client.options[:id_generator],
:session => session,
:txn_num => txn_num,
:comment => opts[:comment]
).execute_with_connection(connection, context: context)
end
end
end
# Insert the provided documents into the collection.
#
# @example Insert documents into the collection.
# collection.insert_many([{ name: 'test' }])
#
# @param [ Enumerable<Hash> ] documents The documents to insert.
# @param [ Hash ] options The insert options.
#
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Object ] :comment A user-provided comment to attach to
# this command.
# @option options [ true | false ] :ordered Whether the operations
# should be executed in order.
# @option options [ Session ] :session The session to use for the operation.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash ] :write_concern The write concern options.
# Can be :w => Integer, :fsync => Boolean, :j => Boolean.
#
# @return [ Result ] The database response wrapper.
#
# @since 2.0.0
def insert_many(documents, options = {})
QueryCache.clear_namespace(namespace)
inserts = documents.map{ |doc| { :insert_one => doc }}
bulk_write(inserts, options)
end
# Execute a batch of bulk write operations.
#
# @example Execute a bulk write.
# collection.bulk_write(operations, options)
#
# @param [ Enumerable<Hash> ] requests The bulk write requests.
# @param [ Hash ] options The options.
#
# @option options [ true | false ] :ordered Whether the operations
# should be executed in order.
# @option options [ Hash ] :write_concern The write concern options.
# Can be :w => Integer, :fsync => Boolean, :j => Boolean.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Session ] :session The session to use for the set of operations.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ BulkWrite::Result ] The result of the operation.
#
# @since 2.0.0
def bulk_write(requests, options = {})
BulkWrite.new(self, requests, options).execute
end
# Remove a document from the collection.
#
# @example Remove a single document from the collection.
# collection.delete_one
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash ] options The options.
#
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ Result ] The response from the database.
#
# @since 2.1.0
def delete_one(filter = nil, options = {})
find(filter, options).delete_one(options)
end
# Remove documents from the collection.
#
# @example Remove multiple documents from the collection.
# collection.delete_many
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash ] options The options.
#
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ Result ] The response from the database.
#
# @since 2.1.0
def delete_many(filter = nil, options = {})
find(filter, options).delete_many(options)
end
# Execute a parallel scan on the collection view.
#
# Returns a list of up to cursor_count cursors that can be iterated concurrently.
# As long as the collection is not modified during scanning, each document appears once
# in one of the cursors' result sets.
#
# @example Execute a parallel collection scan.
# collection.parallel_scan(2)
#
# @param [ Integer ] cursor_count The max number of cursors to return.
# @param [ Hash ] options The parallel scan command options.
#
# @option options [ Integer ] :max_time_ms The maximum amount of time to
# allow the query to run, in milliseconds. This option is deprecated, use
# :timeout_ms instead.
# @option options [ Session ] :session The session to use.
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
# iteration).
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ Array<Cursor> ] An array of cursors.
#
# @since 2.1
def parallel_scan(cursor_count, options = {})
find({}, options).parallel_scan(cursor_count, options)
end
# Replaces a single document in the collection with the new document.
#
# @example Replace a single document.
# collection.replace_one({ name: 'test' }, { name: 'test1' })
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash ] replacement The replacement document..
# @param [ Hash ] options The options.
#
# @option options [ true | false ] :upsert Whether to upsert if the
# document doesn't exist.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ Result ] The response from the database.
#
# @since 2.1.0
def replace_one(filter, replacement, options = {})
find(filter, options).replace_one(replacement, options)
end
# Update documents in the collection.
#
# @example Update multiple documents in the collection.
# collection.update_many({ name: 'test'}, '$set' => { name: 'test1' })
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash | Array<Hash> ] update The update document or pipeline.
# @param [ Hash ] options The options.
#
# @option options [ true | false ] :upsert Whether to upsert if the
# document doesn't exist.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Array ] :array_filters A set of filters specifying to which array elements
# an update should apply.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ Result ] The response from the database.
#
# @since 2.1.0
def update_many(filter, update, options = {})
find(filter, options).update_many(update, options)
end
# Update a single document in the collection.
#
# @example Update a single document in the collection.
# collection.update_one({ name: 'test'}, '$set' => { name: 'test1'})
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash | Array<Hash> ] update The update document or pipeline.
# @param [ Hash ] options The options.
#
# @option options [ true | false ] :upsert Whether to upsert if the
# document doesn't exist.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Array ] :array_filters A set of filters specifying to which array elements
# an update should apply.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ Result ] The response from the database.
#
# @since 2.1.0
def update_one(filter, update, options = {})
find(filter, options).update_one(update, options)
end
# Finds a single document in the database via findAndModify and deletes
# it, returning the original document.
#
# @example Find one document and delete it.
# collection.find_one_and_delete(name: 'test')
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash ] options The options.
#
# @option options [ Integer ] :max_time_ms The maximum amount of time to
# allow the query to run, in milliseconds. This option is deprecated, use
# :timeout_ms instead.
# @option options [ Hash ] :projection The fields to include or exclude in the returned doc.
# @option options [ Hash ] :sort The key and direction pairs by which the result set
# will be sorted.
# @option options [ Hash ] :write_concern The write concern options.
# Defaults to the collection's write concern.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ BSON::Document, nil ] The document, if found.
#
# @since 2.1.0
def find_one_and_delete(filter, options = {})
find(filter, options).find_one_and_delete(options)
end
# Finds a single document via findAndModify and updates it, returning the original doc unless
# otherwise specified.
#
# @example Find a document and update it, returning the original.
# collection.find_one_and_update({ name: 'test' }, { "$set" => { name: 'test1' }})
#
# @example Find a document and update it, returning the updated document.
# collection.find_one_and_update({ name: 'test' }, { "$set" => { name: 'test1' }}, :return_document => :after)
#
# @param [ Hash ] filter The filter to use.
# @param [ Hash | Array<Hash> ] update The update document or pipeline.
# @param [ Hash ] options The options.
#
# @option options [ Integer ] :max_time_ms The maximum amount of time to allow the command
# to run in milliseconds.
# @option options [ Hash ] :projection The fields to include or exclude in the returned doc.
# @option options [ Hash ] :sort The key and direction pairs by which the result set
# will be sorted.
# @option options [ Symbol ] :return_document Either :before or :after.
# @option options [ true | false ] :upsert Whether to upsert if the document doesn't exist.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Hash ] :write_concern The write concern options.
# Defaults to the collection's write concern.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Array ] :array_filters A set of filters specifying to which array elements
# an update should apply.
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
#
# @return [ BSON::Document ] The document.
#
# @since 2.1.0
def find_one_and_update(filter, update, options = {})
find(filter, options).find_one_and_update(update, options)
end
# Finds a single document and replaces it, returning the original doc unless
# otherwise specified.
#
# @example Find a document and replace it, returning the original.
# collection.find_one_and_replace({ name: 'test' }, { name: 'test1' })
#
# @example Find a document and replace it, returning the new document.
# collection.find_one_and_replace({ name: 'test' }, { name: 'test1' }, :return_document => :after)
#
# @param [ Hash ] filter The filter to use.
# @param [ BSON::Document ] replacement The replacement document.
# @param [ Hash ] options The options.
#
# @option options [ Integer ] :max_time_ms The maximum amount of time to allow the command
# to run in milliseconds.
# @option options [ Hash ] :projection The fields to include or exclude in the returned doc.
# @option options [ Hash ] :sort The key and direction pairs by which the result set
# will be sorted.
# @option options [ Symbol ] :return_document Either :before or :after.
# @option options [ true | false ] :upsert Whether to upsert if the document doesn't exist.
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Hash ] :write_concern The write concern options.
# Defaults to the collection's write concern.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must be a non-negative integer. An explicit value of 0 means infinite.
# The default value is unset which means the value is inherited from
# the collection or the database or the client.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
# @return [ BSON::Document ] The document.
#
# @since 2.1.0
def find_one_and_replace(filter, replacement, options = {})
find(filter, options).find_one_and_update(replacement, options)
end
# Get the fully qualified namespace of the collection.
#
# @example Get the fully qualified namespace.
# collection.namespace
#
# @return [ String ] The collection namespace.
#
# @since 2.0.0
def namespace
"#{database.name}.#{name}"
end
# Whether the collection is a system collection.
#
# @return [ Boolean ] Whether the system is a system collection.
#
# @api private
def system_collection?
name.start_with?('system.')
end
# @return [ Integer | nil ] Operation timeout that is for this database or
# for the corresponding client.
#
# @api private
def timeout_ms
@timeout_ms || database.timeout_ms
end
# @return [ Hash ] timeout_ms value set on the operation level (if any),
# and/or timeout_ms that is set on collection/database/client level (if any).
#
# @api private
def operation_timeouts(opts = {})
# TODO: We should re-evaluate if we need two timeouts separately.
{}.tap do |result|
if opts[:timeout_ms].nil?
result[:inherited_timeout_ms] = timeout_ms
else
result[:operation_timeout_ms] = opts.delete(:timeout_ms)
end
end
end
end
end