mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/grid/fs_bucket.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  module Grid

    # Represents a view of the GridFS in the database.
    #
    # @since 2.0.0
    class FSBucket
      extend Forwardable

      # The default root prefix.
      #
      # @since 2.0.0
      DEFAULT_ROOT = 'fs'.freeze

      # The specification for the chunks collection index.
      #
      # @since 2.0.0
      CHUNKS_INDEX = { :files_id => 1, :n => 1 }.freeze

      # The specification for the files collection index.
      #
      # @since 2.1.0
      FILES_INDEX = { filename: 1, uploadDate: 1 }.freeze

      # Create the GridFS.
      #
      # @example Create the GridFS.
      #   Grid::FSBucket.new(database)
      #
      # @param [ Database ] database The database the files reside in.
      # @param [ Hash ] options The GridFS options.
      #
      # @option options [ String ] :bucket_name The prefix for the files and chunks
      #   collections.
      # @option options [ Integer ] :chunk_size Override the default chunk
      #   size.
      # @option options [ String ] :fs_name The prefix for the files and chunks
      #   collections.
      # @option options [ Hash ] :read The read preference options. The hash
      #   may have the following items:
      #   - *:mode* -- read preference specified as a symbol; valid values are
      #     *:primary*, *:primary_preferred*, *:secondary*, *:secondary_preferred*
      #     and *:nearest*.
      #   - *:tag_sets* -- an array of hashes.
      #   - *:local_threshold*.
      # @option options [ Session ] :session The session to use.
      # @option options [ Hash ] :write Deprecated. Equivalent to :write_concern
      #   option.
      # @option options [ Hash ] :write_concern The write concern options.
      #   Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
      #
      # @since 2.0.0
      def initialize(database, options = {})
        @database = database
        @options = options.dup
=begin WriteConcern object support
        if @options[:write_concern].is_a?(WriteConcern::Base)
          # Cache the instance so that we do not needlessly reconstruct it.
          @write_concern = @options[:write_concern]
          @options[:write_concern] = @write_concern.options
        end
=end
        @options.freeze
        @chunks_collection = database[chunks_name]
        @files_collection = database[files_name]
      end

      # @return [ Collection ] chunks_collection The chunks collection.
      #
      # @since 2.0.0
      attr_reader :chunks_collection

      # @return [ Database ] database The database.
      #
      # @since 2.0.0
      attr_reader :database

      # @return [ Collection ] files_collection The files collection.
      #
      # @since 2.0.0
      attr_reader :files_collection

      # @return [ Hash ] options The FSBucket options.
      #
      # @since 2.1.0
      attr_reader :options

      # Get client from the database.
      #
      # @since 2.1.0
      def_delegators :database,
                     :client

      # Find files collection documents matching a given selector.
      #
      # @example Find files collection documents by a filename.
      #   fs.find(filename: 'file.txt')
      #
      # @param [ Hash ] selector The selector to use in the find.
      # @param [ Hash ] options The options for the find.
      #
      # @option options [ true, false ] :allow_disk_use Whether the server can
      #   write temporary data to disk while executing the find operation.
      # @option options [ Integer ] :batch_size The number of documents returned in each batch
      #   of results from MongoDB.
      # @option options [ Integer ] :limit The max number of docs to return from the query.
      # @option options [ true, false ] :no_cursor_timeout The server normally times out idle
      #   cursors after an inactivity period (10 minutes) to prevent excess memory use.
      #   Set this option to prevent that.
      # @option options [ Integer ] :skip The number of docs to skip before returning results.
      # @option options [ Hash ] :sort The key and direction pairs by which the result set
      #   will be sorted.
      #
      # @return [ CollectionView ] The collection view.
      #
      # @since 2.1.0
      def find(selector = nil, options = {})
        opts = options.merge(read: read_preference) if read_preference
        files_collection.find(selector, opts || options)
      end

      # Find a file in the GridFS.
      #
      # @example Find a file by its id.
      #   fs.find_one(_id: id)
      #
      # @example Find a file by its filename.
      #   fs.find_one(filename: 'test.txt')
      #
      # @param [ Hash ] selector The selector.
      #
      # @return [ Grid::File ] The file.
      #
      # @since 2.0.0
      #
      # @deprecated Please use #find instead with a limit of -1.
      #   Will be removed in version 3.0.
      def find_one(selector = nil)
        file_info = files_collection.find(selector).first
        return nil unless file_info
        chunks = chunks_collection.find(:files_id => file_info[:_id]).sort(:n => 1)
        Grid::File.new(chunks.to_a, Options::Mapper.transform(file_info, Grid::File::Info::MAPPINGS.invert))
      end

      # Insert a single file into the GridFS.
      #
      # @example Insert a single file.
      #   fs.insert_one(file)
      #
      # @param [ Grid::File ] file The file to insert.
      #
      # @return [ BSON::ObjectId ] The file id.
      #
      # @since 2.0.0
      #
      # @deprecated Please use #upload_from_stream or #open_upload_stream instead.
      #   Will be removed in version 3.0.
      def insert_one(file)
        @indexes ||= ensure_indexes!
        chunks_collection.insert_many(file.chunks)
        files_collection.insert_one(file.info)
        file.id
      end

      # Get the prefix for the GridFS
      #
      # @example Get the prefix.
      #   fs.prefix
      #
      # @return [ String ] The GridFS prefix.
      #
      # @since 2.0.0
      def prefix
        @options[:fs_name] || @options[:bucket_name] || DEFAULT_ROOT
      end

      # Remove a single file from the GridFS.
      #
      # @example Remove a file from the GridFS.
      #   fs.delete_one(file)
      #
      # @param [ Grid::File ] file The file to remove.
      #
      # @return [ Result ] The result of the remove.
      #
      # @since 2.0.0
      def delete_one(file)
        delete(file.id)
      end

      # Remove a single file, identified by its id from the GridFS.
      #
      # @example Remove a file from the GridFS.
      #   fs.delete(id)
      #
      # @param [ BSON::ObjectId, Object ] id The id of the file to remove.
      #
      # @return [ Result ] The result of the remove.
      #
      # @raise [ Error::FileNotFound ] If the file is not found.
      #
      # @since 2.1.0
      def delete(id)
        result = files_collection.find({ :_id => id }, @options).delete_one
        chunks_collection.find({ :files_id => id }, @options).delete_many
        raise Error::FileNotFound.new(id, :id) if result.n == 0
        result
      end

      # Opens a stream from which a file can be downloaded, specified by id.
      #
      # @example Open a stream from which a file can be downloaded.
      #   fs.open_download_stream(id)
      #
      # @param [ BSON::ObjectId, Object ] id The id of the file to read.
      # @param [ Hash ] options The options.
      #
      # @option options [ BSON::Document ] :file_info_doc For internal
      #   driver use only. A BSON document to use as file information.
      #
      # @return [ Stream::Read ] The stream to read from.
      #
      # @yieldparam [ Hash ] The read stream.
      #
      # @since 2.1.0
      def open_download_stream(id, options = nil)
        options = Utils.shallow_symbolize_keys(options || {})
        read_stream(id, **options).tap do |stream|
          if block_given?
            begin
              yield stream
            ensure
              stream.close
            end
          end
        end
      end

      # Downloads the contents of the file specified by id and writes them to
      # the destination io object.
      #
      # @example Download the file and write it to the io object.
      #   fs.download_to_stream(id, io)
      #
      # @param [ BSON::ObjectId, Object ] id The id of the file to read.
      # @param [ IO ] io The io object to write to.
      #
      # @since 2.1.0
      def download_to_stream(id, io)
        open_download_stream(id) do |stream|
          stream.each do |chunk|
            io << chunk
          end
        end
      end

      # Opens a stream from which the application can read the contents of the stored file
      # specified by filename and the revision in options.
      #
      # Revision numbers are defined as follows:
      # 0 = the original stored file
      # 1 = the first revision
      # 2 = the second revision
      # etc…
      # -2 = the second most recent revision
      # -1 = the most recent revision
      #
      # @example Open a stream to download the most recent revision.
      #   fs.open_download_stream_by_name('some-file.txt')
      #
      # # @example Open a stream to download the original file.
      #   fs.open_download_stream_by_name('some-file.txt', revision: 0)
      #
      # @example Open a stream to download the second revision of the stored file.
      #   fs.open_download_stream_by_name('some-file.txt', revision: 2)
      #
      # @param [ String ] filename The file's name.
      # @param [ Hash ] opts Options for the download.
      #
      # @option opts [ Integer ] :revision The revision number of the file to download.
      #   Defaults to -1, the most recent version.
      #
      # @return [ Stream::Read ] The stream to read from.
      #
      # @raise [ Error::FileNotFound ] If the file is not found.
      # @raise [ Error::InvalidFileRevision ] If the requested revision is not found for the file.
      #
      # @yieldparam [ Hash ] The read stream.
      #
      # @since 2.1.0
      def open_download_stream_by_name(filename, opts = {}, &block)
        revision = opts.fetch(:revision, -1)
        if revision < 0
          skip = revision.abs - 1
          sort = { 'uploadDate' => Mongo::Index::DESCENDING }
        else
          skip = revision
          sort = { 'uploadDate' => Mongo::Index::ASCENDING }
        end
        file_info_doc = files_collection.find({ filename: filename} ,
                                           sort: sort,
                                           skip: skip,
                                           limit: -1).first
        unless file_info_doc
          raise Error::FileNotFound.new(filename, :filename) unless opts[:revision]
          raise Error::InvalidFileRevision.new(filename, opts[:revision])
        end
        open_download_stream(file_info_doc[:_id], file_info_doc: file_info_doc, &block)
      end

      # Downloads the contents of the stored file specified by filename and by the
      # revision in options and writes the contents to the destination io object.
      #
      # Revision numbers are defined as follows:
      # 0 = the original stored file
      # 1 = the first revision
      # 2 = the second revision
      # etc…
      # -2 = the second most recent revision
      # -1 = the most recent revision
      #
      # @example Download the most recent revision.
      #   fs.download_to_stream_by_name('some-file.txt', io)
      #
      # # @example Download the original file.
      #   fs.download_to_stream_by_name('some-file.txt', io, revision: 0)
      #
      # @example Download the second revision of the stored file.
      #   fs.download_to_stream_by_name('some-file.txt', io, revision: 2)
      #
      # @param [ String ] filename The file's name.
      # @param [ IO ] io The io object to write to.
      # @param [ Hash ] opts Options for the download.
      #
      # @option opts [ Integer ] :revision The revision number of the file to download.
      #   Defaults to -1, the most recent version.
      #
      # @raise [ Error::FileNotFound ] If the file is not found.
      # @raise [ Error::InvalidFileRevision ] If the requested revision is not found for the file.
      #
      # @since 2.1.0
      def download_to_stream_by_name(filename, io, opts = {})
        download_to_stream(open_download_stream_by_name(filename, opts).file_id, io)
      end

      # Opens an upload stream to GridFS to which the contents of a file or
      # blob can be written.
      #
      # @param [ String ] filename The name of the file in GridFS.
      # @param [ Hash ] opts The options for the write stream.
      #
      # @option opts [ Object ] :file_id An optional unique file id.
      #   A BSON::ObjectId is automatically generated if a file id is not
      #   provided.
      # @option opts [ Integer ] :chunk_size Override the default chunk size.
      # @option opts [ Hash ] :metadata User data for the 'metadata' field of the files
      #   collection document.
      # @option opts [ String ] :content_type The content type of the file.
      #   Deprecated, please use the metadata document instead.
      # @option opts [ Array<String> ] :aliases A list of aliases.
      #   Deprecated, please use the metadata document instead.
      # @option options [ Hash ] :write Deprecated. Equivalent to :write_concern
      #   option.
      # @option options [ Hash ] :write_concern The write concern options.
      #   Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
      #
      # @return [ Stream::Write ] The write stream.
      #
      # @yieldparam [ Hash ] The write stream.
      #
      # @since 2.1.0
      def open_upload_stream(filename, opts = {})
        opts = Utils.shallow_symbolize_keys(opts)
        write_stream(filename, **opts).tap do |stream|
          if block_given?
            begin
              yield stream
            ensure
              stream.close
            end
          end
        end
      end

      # Uploads a user file to a GridFS bucket.
      # Reads the contents of the user file from the source stream and uploads it as chunks in the
      # chunks collection. After all the chunks have been uploaded, it creates a files collection
      # document for the filename in the files collection.
      #
      # @example Upload a file to the GridFS bucket.
      #   fs.upload_from_stream('a-file.txt', file)
      #
      # @param [ String ] filename The filename of the file to upload.
      # @param [ IO ] io The source io stream to upload from.
      # @param [ Hash ] opts The options for the write stream.
      #
      # @option opts [ Object ] :file_id An optional unique file id. An ObjectId is generated otherwise.
      # @option opts [ Integer ] :chunk_size Override the default chunk size.
      # @option opts [ Hash ] :metadata User data for the 'metadata' field of the files
      #   collection document.
      # @option opts [ String ] :content_type The content type of the file. Deprecated, please
      #   use the metadata document instead.
      # @option opts [ Array<String> ] :aliases A list of aliases. Deprecated, please use the
      #   metadata document instead.
      # @option options [ Hash ] :write Deprecated. Equivalent to :write_concern
      #   option.
      # @option options [ Hash ] :write_concern The write concern options.
      #   Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
      #
      # @return [ BSON::ObjectId ] The ObjectId file id.
      #
      # @since 2.1.0
      def upload_from_stream(filename, io, opts = {})
        open_upload_stream(filename, opts) do |stream|
          begin
            stream.write(io)
          # IOError and SystemCallError are for errors reading the io.
          # Error::SocketError and Error::SocketTimeoutError are for
          # writing to MongoDB.
          rescue IOError, SystemCallError, Error::SocketError, Error::SocketTimeoutError
            begin
              stream.abort
            rescue Error::OperationFailure
            end
            raise
          end
        end.file_id
      end

      # Get the read preference.
      #
      # @note This method always returns a BSON::Document instance, even though
      #   the FSBucket constructor specifies the type of :read as a Hash, not
      #   as a BSON::Document.
      #
      # @return [ BSON::Document ] The read preference.
      #   The document may have the following fields:
      #   - *:mode* -- read preference specified as a symbol; valid values are
      #     *:primary*, *:primary_preferred*, *:secondary*, *:secondary_preferred*
      #     and *:nearest*.
      #   - *:tag_sets* -- an array of hashes.
      #   - *:local_threshold*.
      def read_preference
        @read_preference ||= begin
          pref = options[:read] || database.read_preference
          if BSON::Document === pref
            pref
          else
            BSON::Document.new(pref)
          end
        end
      end

      # Get the write concern.
      #
      # @example Get the write concern.
      #   stream.write_concern
      #
      # @return [ Mongo::WriteConcern ] The write concern.
      #
      # @since 2.1.0
      def write_concern
        @write_concern ||= if wco = @options[:write_concern] || @options[:write]
          WriteConcern.get(wco)
        else
          database.write_concern
        end
      end

      # Drop the collections that implement this bucket.
      def drop
        files_collection.drop
        chunks_collection.drop
      end

      private

      # @param [ Hash ] opts The options.
      #
      # @option opts [ BSON::Document ] :file_info_doc For internal
      #   driver use only. A BSON document to use as file information.
      def read_stream(id, **opts)
        Stream.get(self, Stream::READ_MODE, { file_id: id }.update(options).update(opts))
      end

      def write_stream(filename, **opts)
        Stream.get(self, Stream::WRITE_MODE, { filename: filename }.update(options).update(opts))
      end

      def chunks_name
        "#{prefix}.#{Grid::File::Chunk::COLLECTION}"
      end

      def files_name
        "#{prefix}.#{Grid::File::Info::COLLECTION}"
      end

      def ensure_indexes!
        if files_collection.find({}, limit: 1, projection: { _id: 1 }).first.nil?
          create_index_if_missing!(files_collection, FSBucket::FILES_INDEX)
        end

        if chunks_collection.find({}, limit: 1, projection: { _id: 1 }).first.nil?
          create_index_if_missing!(chunks_collection, FSBucket::CHUNKS_INDEX, :unique => true)
        end
      end

      def create_index_if_missing!(collection, index_spec, options = {})
        indexes_view = collection.indexes
        begin
          if indexes_view.get(index_spec).nil?
            indexes_view.create_one(index_spec, options)
          end
        rescue Mongo::Error::OperationFailure => e
          # proceed with index creation if a NamespaceNotFound error is thrown
          if e.code == 26
            indexes_view.create_one(index_spec, options)
          else
            raise
          end
        end
      end
    end
  end
end