mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/database.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true
# encoding: utf-8

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/database/view'

module Mongo

  # Represents a database on the db server and operations that can execute on
  # it at this level.
  #
  # @since 2.0.0
  class Database
    extend Forwardable
    include Retryable

    # The admin database name.
    #
    # @since 2.0.0
    ADMIN = 'admin'.freeze

    # The "collection" that database commands operate against.
    #
    # @since 2.0.0
    COMMAND = '$cmd'.freeze

    # The default database options.
    #
    # @since 2.0.0
    DEFAULT_OPTIONS = Options::Redacted.new(:database => ADMIN).freeze

    # Database name field constant.
    #
    # @since 2.1.0
    # @deprecated
    NAME = 'name'.freeze

    # Databases constant.
    #
    # @since 2.1.0
    DATABASES = 'databases'.freeze

    # The name of the collection that holds all the collection names.
    #
    # @since 2.0.0
    NAMESPACES = 'system.namespaces'.freeze

    # @return [ Client ] client The database client.
    attr_reader :client

    # @return [ String ] name The name of the database.
    attr_reader :name

    # @return [ Hash ] options The options.
    attr_reader :options

    # Get cluster, read preference, and write concern from client.
    def_delegators :@client,
                   :cluster,
                   :read_preference,
                   :server_selector,
                   :read_concern,
                   :write_concern,
                   :encrypted_fields_map

    # @return [ Mongo::Server ] Get the primary server from the cluster.
    def_delegators :cluster,
                   :next_primary

    # Check equality of the database object against another. Will simply check
    # if the names are the same.
    #
    # @example Check database equality.
    #   database == other
    #
    # @param [ Object ] other The object to check against.
    #
    # @return [ true, false ] If the objects are equal.
    #
    # @since 2.0.0
    def ==(other)
      return false unless other.is_a?(Database)
      name == other.name
    end

    # Get a collection in this database by the provided name.
    #
    # @example Get a collection.
    #   database[:users]
    #
    # @param [ String, Symbol ] collection_name The name of the collection.
    # @param [ Hash ] options The options to the collection.
    #
    # @return [ Mongo::Collection ] The collection object.
    #
    # @since 2.0.0
    def [](collection_name, options = {})
      if options[:server_api]
        raise ArgumentError, 'The :server_api option cannot be specified for collection objects. It can only be specified on Client level'
      end
      Collection.new(self, collection_name, options)
    end
    alias_method :collection, :[]

    # Get all the names of the non-system collections in the database.
    #
    # @note The set of returned collection names depends on the version of
    #   MongoDB server that fulfills the request.
    #
    # @param [ Hash ] options
    #
    # @option options [ Hash ] :filter A filter on the collections returned.
    # @option options [ true, false ] :authorized_collections A flag, when
    #   set to true and used with nameOnly: true, that allows a user without the
    #   required privilege to run the command when access control is enforced
    # @option options [ Object ] :comment A user-provided
    #   comment to attach to this command.
    #
    #   See https://mongodb.com/docs/manual/reference/command/listCollections/
    #   for more information and usage.
    #
    # @return [ Array<String> ] Names of the collections.
    #
    # @since 2.0.0
    def collection_names(options = {})
      View.new(self).collection_names(options)
    end

    # Get info on all the non-system collections in the database.
    #
    # @note The set of collections returned, and the schema of the
    #   information hash per collection, depends on the MongoDB server
    #   version that fulfills the request.
    #
    # @param [ Hash ] options
    #
    # @option options [ Hash ] :filter A filter on the collections returned.
    # @option options [ true, false ] :name_only Indicates whether command
    #   should return just collection/view names and type or return both the
    #   name and other information
    # @option options [ true, false ] :authorized_collections A flag, when
    #   set to true and used with nameOnly: true, that allows a user without the
    #   required privilege to run the command when access control is enforced.
    # @option options [ Object ] :comment A user-provided
    #   comment to attach to this command.
    #
    #   See https://mongodb.com/docs/manual/reference/command/listCollections/
    #   for more information and usage.
    #
    # @return [ Array<Hash> ] Array of information hashes, one for each
    #   collection in the database.
    #
    # @since 2.0.5
    def list_collections(options = {})
      View.new(self).list_collections(options)
    end

    # Get all the non-system collections that belong to this database.
    #
    # @note The set of returned collections depends on the version of
    #   MongoDB server that fulfills the request.
    #
    # @param [ Hash ] options
    #
    # @option options [ Hash ] :filter A filter on the collections returned.
    # @option options [ true, false ] :authorized_collections A flag, when
    #   set to true and used with name_only: true, that allows a user without the
    #   required privilege to run the command when access control is enforced.
    # @option options [ Object ] :comment A user-provided
    #   comment to attach to this command.
    #
    #   See https://mongodb.com/docs/manual/reference/command/listCollections/
    #   for more information and usage.
    #
    # @return [ Array<Mongo::Collection> ] The collections.
    #
    # @since 2.0.0
    def collections(options = {})
      collection_names(options).map { |name| collection(name) }
    end

    # Execute a command on the database.
    #
    # @example Execute a command.
    #   database.command(:hello => 1)
    #
    # @param [ Hash ] operation The command to execute.
    # @param [ Hash ] opts The command options.
    #
    # @option opts :read [ Hash ] The read preference for this command.
    # @option opts :session [ Session ] The session to use for this command.
    # @option opts :execution_options [ Hash ] Options to pass to the code that
    #   executes this command. This is an internal option and is subject to
    #   change.
    #   - :deserialize_as_bson [ Boolean ] Whether to deserialize the response
    #     to this command using BSON types intead of native Ruby types wherever
    #     possible.
    #
    # @return [ Mongo::Operation::Result ] The result of the command execution.
    def command(operation, opts = {})
      opts = opts.dup
      execution_opts = opts.delete(:execution_options) || {}

      txn_read_pref = if opts[:session] && opts[:session].in_transaction?
        opts[:session].txn_read_preference
      else
        nil
      end
      txn_read_pref ||= opts[:read] || ServerSelector::PRIMARY
      Lint.validate_underscore_read_preference(txn_read_pref)
      selector = ServerSelector.get(txn_read_pref)

      client.send(:with_session, opts) do |session|
        server = selector.select_server(cluster, nil, session)
        op = Operation::Command.new(
          :selector => operation,
          :db_name => name,
          :read => selector,
          :session => session
        )

        op.execute(server,
          context: Operation::Context.new(client: client, session: session),
          options: execution_opts)
      end
    end

    # Execute a read command on the database, retrying the read if necessary.
    #
    # @param [ Hash ] operation The command to execute.
    # @param [ Hash ] opts The command options.
    #
    # @option opts :read [ Hash ] The read preference for this command.
    # @option opts :session [ Session ] The session to use for this command.
    #
    # @return [ Hash ] The result of the command execution.
    # @api private
    def read_command(operation, opts = {})
      txn_read_pref = if opts[:session] && opts[:session].in_transaction?
        opts[:session].txn_read_preference
      else
        nil
      end
      txn_read_pref ||= opts[:read] || ServerSelector::PRIMARY
      Lint.validate_underscore_read_preference(txn_read_pref)
      preference = ServerSelector.get(txn_read_pref)

      client.send(:with_session, opts) do |session|
        read_with_retry(session, preference) do |server|
          Operation::Command.new(
            selector: operation.dup,
            db_name: name,
            read: preference,
            session: session,
            comment: opts[:comment],
          ).execute(server, context: Operation::Context.new(client: client, session: session))
        end
      end
    end

    # Drop the database and all its associated information.
    #
    # @example Drop the database.
    #   database.drop
    #
    # @param [ Hash ] options The options for the operation.
    #
    # @option options [ Session ] :session The session to use for the operation.
    # @option opts [ Hash ] :write_concern The write concern options.
    #
    # @return [ Result ] The result of the command.
    #
    # @since 2.0.0
    def drop(options = {})
      operation = { :dropDatabase => 1 }
      client.send(:with_session, options) do |session|
        write_concern = if options[:write_concern]
          WriteConcern.get(options[:write_concern])
        else
          self.write_concern
        end
        Operation::DropDatabase.new({
          selector: operation,
          db_name: name,
          write_concern: write_concern,
          session: session
        }).execute(next_primary(nil, session), context: Operation::Context.new(client: client, session: session))
      end
    end

    # Instantiate a new database object.
    #
    # @example Instantiate the database.
    #   Mongo::Database.new(client, :test)
    #
    # @param [ Mongo::Client ] client The driver client.
    # @param [ String, Symbol ] name The name of the database.
    # @param [ Hash ] options The options.
    #
    # @raise [ Mongo::Database::InvalidName ] If the name is nil.
    #
    # @since 2.0.0
    def initialize(client, name, options = {})
      raise Error::InvalidDatabaseName.new unless name
      if Lint.enabled? && !(name.is_a?(String) || name.is_a?(Symbol))
        raise "Database name must be a string or a symbol: #{name}"
      end
      @client = client
      @name = name.to_s.freeze
      @options = options.freeze
    end

    # Get a pretty printed string inspection for the database.
    #
    # @example Inspect the database.
    #   database.inspect
    #
    # @return [ String ] The database inspection.
    #
    # @since 2.0.0
    def inspect
      "#<Mongo::Database:0x#{object_id} name=#{name}>"
    end

    # Get the Grid "filesystem" for this database.
    #
    # @param [ Hash ] options The GridFS options.
    #
    # @option options [ String ] :bucket_name The prefix for the files and chunks
    #   collections.
    # @option options [ Integer ] :chunk_size Override the default chunk
    #   size.
    # @option options [ String ] :fs_name The prefix for the files and chunks
    #   collections.
    # @option options [ String ] :read The read preference.
    # @option options [ Session ] :session The session to use.
    # @option options [ Hash ] :write Deprecated. Equivalent to :write_concern
    #   option.
    # @option options [ Hash ] :write_concern The write concern options.
    #   Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
    #
    # @return [ Grid::FSBucket ] The GridFS for the database.
    #
    # @since 2.0.0
    def fs(options = {})
      Grid::FSBucket.new(self, options)
    end

    # Get the user view for this database.
    #
    # @example Get the user view.
    #   database.users
    #
    # @return [ View::User ] The user view.
    #
    # @since 2.0.0
    def users
      Auth::User::View.new(self)
    end

    # Perform an aggregation on the database.
    #
    # @example Perform an aggregation.
    #   collection.aggregate([ { "$listLocalSessions" => {} } ])
    #
    # @param [ Array<Hash> ] pipeline The aggregation pipeline.
    # @param [ Hash ] options The aggregation options.
    #
    # @option options [ true, false ] :allow_disk_use Set to true if disk
    #   usage is allowed during the aggregation.
    # @option options [ Integer ] :batch_size The number of documents to return
    #   per batch.
    # @option options [ true, false ] :bypass_document_validation Whether or
    #   not to skip document level validation.
    # @option options [ Hash ] :collation The collation to use.
    # @option options [ Object ] :comment A user-provided
    #   comment to attach to this command.
    # @option options [ String ] :hint The index to use for the aggregation.
    # @option options [ Integer ] :max_time_ms The maximum amount of time in
    #   milliseconds to allow the aggregation to run.
    # @option options [ true, false ] :use_cursor Indicates whether the command
    #   will request that the server provide results using a cursor. Note that
    #   as of server version 3.6, aggregations always provide results using a
    #   cursor and this option is therefore not valid.
    # @option options [ Session ] :session The session to use.
    #
    # @return [ Collection::View::Aggregation ] The aggregation object.
    #
    # @since 2.10.0
    def aggregate(pipeline, options = {})
      View.new(self).aggregate(pipeline, options)
    end

    # As of version 3.6 of the MongoDB server, a ``$changeStream`` pipeline stage is supported
    # in the aggregation framework. As of version 4.0, this stage allows users to request that
    # notifications are sent for all changes that occur in the client's database.
    #
    # @example Get change notifications for a given database..
    #  database.watch([{ '$match' => { operationType: { '$in' => ['insert', 'replace'] } } }])
    #
    # @param [ Array<Hash> ] pipeline Optional additional filter operators.
    # @param [ Hash ] options The change stream options.
    #
    # @option options [ String ] :full_document Allowed values: nil, 'default',
    #   'updateLookup', 'whenAvailable', 'required'.
    #
    #   The default is to not send a value (i.e. nil), which is equivalent to
    #   'default'. By default, the change notification for partial updates will
    #   include a delta describing the changes to the document.
    #
    #   When set to 'updateLookup', the change notification for partial updates
    #   will include both a delta describing the changes to the document as well
    #   as a copy of the entire document that was changed from some time after
    #   the change occurred.
    #
    #   When set to 'whenAvailable', configures the change stream to return the
    #   post-image of the modified document for replace and update change events
    #   if the post-image for this event is available.
    #
    #   When set to 'required', the same behavior as 'whenAvailable' except that
    #   an error is raised if the post-image is not available.
    # @option options [ String ] :full_document_before_change Allowed values: nil,
    #   'whenAvailable', 'required', 'off'.
    #
    #   The default is to not send a value (i.e. nil), which is equivalent to 'off'.
    #
    #   When set to 'whenAvailable', configures the change stream to return the
    #   pre-image of the modified document for replace, update, and delete change
    #   events if it is available.
    #
    #   When set to 'required', the same behavior as 'whenAvailable' except that
    #   an error is raised if the pre-image is not available.
    # @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point
    #   for the new change stream.
    # @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to
    #   wait on new documents to satisfy a change stream query.
    # @option options [ Integer ] :batch_size The number of documents to return per batch.
    # @option options [ BSON::Document, Hash ] :collation The collation to use.
    # @option options [ Session ] :session The session to use.
    # @option options [ BSON::Timestamp ] :start_at_operation_time Only return
    #   changes that occurred after the specified timestamp. Any command run
    #   against the server will return a cluster time that can be used here.
    #   Only recognized by server versions 4.0+.
    # @option options [ Object ] :comment A user-provided
    #   comment to attach to this command.
    # @option options [ Boolean ] :show_expanded_events Enables the server to
    #   send the 'expanded' list of change stream events. The list of additional
    #   events included with this flag set are: createIndexes, dropIndexes,
    #   modify, create, shardCollection, reshardCollection,
    #   refineCollectionShardKey.
    #
    # @note A change stream only allows 'majority' read concern.
    # @note This helper method is preferable to running a raw aggregation with a $changeStream
    #   stage, for the purpose of supporting resumability.
    #
    # @return [ ChangeStream ] The change stream object.
    #
    # @since 2.6.0
    def watch(pipeline = [], options = {})
      view_options = options.dup
      view_options[:await_data] = true if options[:max_await_time_ms]

      Mongo::Collection::View::ChangeStream.new(
        Mongo::Collection::View.new(collection("#{COMMAND}.aggregate"), {}, view_options),
        pipeline,
        Mongo::Collection::View::ChangeStream::DATABASE,
        options)
    end

    # Create a database for the provided client, for use when we don't want the
    # client's original database instance to be the same.
    #
    # @api private
    #
    # @example Create a database for the client.
    #   Database.create(client)
    #
    # @param [ Client ] client The client to create on.
    #
    # @return [ Database ] The database.
    #
    # @since 2.0.0
    def self.create(client)
      database = Database.new(client, client.options[:database], client.options)
      client.instance_variable_set(:@database, database)
    end
  end
end