mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/grid/stream/write.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  module Grid
    class FSBucket

      module Stream
        # A stream that writes files to the FSBucket.
        #
        # @since 2.1.0
        class Write

          # @return [ FSBucket ] fs The fs bucket to which this stream writes.
          #
          # @since 2.1.0
          attr_reader :fs

          # @return [ Object ] file_id The id of the file being uploaded.
          #
          # @since 2.1.0
          attr_reader :file_id

          # @return [ String ] filename The name of the file being uploaded.
          #
          # @since 2.1.0
          attr_reader :filename

          # @return [ Hash ] options The write stream options.
          #
          # @since 2.1.0
          attr_reader :options

          # Create a stream for writing files to the FSBucket.
          #
          # @example Create the stream.
          #   Stream::Write.new(fs, options)
          #
          # @param [ FSBucket ] fs The GridFS bucket object.
          # @param [ Hash ] options The write stream options.
          #
          # @option options [ Object ] :file_id The file id. An ObjectId
          #   is generated if the file id is not provided.
          # @option opts [ Integer ] :chunk_size Override the default chunk size.
          # @option opts [ Hash ] :metadata User data for the 'metadata' field of the files collection document.
          # @option opts [ String ] :content_type The content type of the file.
          #   Deprecated, please use the metadata document instead.
          # @option opts [ Array<String> ] :aliases A list of aliases.
          #   Deprecated, please use the metadata document instead.
          # @option options [ Hash ] :write Deprecated. Equivalent to :write_concern
          #   option.
          # @option options [ Hash ] :write_concern The write concern options.
          #   Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
          #
          # @since 2.1.0
          def initialize(fs, options)
            @fs = fs
            @length = 0
            @n = 0
            @file_id = options[:file_id] || BSON::ObjectId.new
            @options = options.dup
=begin WriteConcern object support
            if @options[:write_concern].is_a?(WriteConcern::Base)
              # Cache the instance so that we do not needlessly reconstruct it.
              @write_concern = @options[:write_concern]
              @options[:write_concern] = @write_concern.options
            end
=end
            @options.freeze
            @filename = @options[:filename]
            @open = true
          end

          # Write to the GridFS bucket from the source stream or a string.
          #
          # @example Write to GridFS.
          #   stream.write(io)
          #
          # @param [ String | IO ] io The string or IO object to upload from.
          #
          # @return [ Stream::Write ] self The write stream itself.
          #
          # @since 2.1.0
          def write(io)
            ensure_open!
            @indexes ||= ensure_indexes!
            @length += if io.respond_to?(:bytesize)
              # String objects
              io.bytesize
            else
              # IO objects
              io.size
            end
            chunks = File::Chunk.split(io, file_info, @n)
            @n += chunks.size
            chunks_collection.insert_many(chunks) unless chunks.empty?
            self
          end

          # Close the write stream.
          #
          # @example Close the stream.
          #   stream.close
          #
          # @return [ BSON::ObjectId, Object ] The file id.
          #
          # @raise [ Error::ClosedStream ] If the stream is already closed.
          #
          # @since 2.1.0
          def close
            ensure_open!
            update_length
            files_collection.insert_one(file_info, @options)
            @open = false
            file_id
          end

          # Get the write concern used when uploading.
          #
          # @example Get the write concern.
          #   stream.write_concern
          #
          # @return [ Mongo::WriteConcern ] The write concern.
          #
          # @since 2.1.0
          def write_concern
            @write_concern ||= if wco = @options[:write_concern] || @options[:write]
              WriteConcern.get(wco)
            else
              fs.write_concern
            end
          end

          # Is the stream closed.
          #
          # @example Is the stream closed.
          #   stream.closed?
          #
          # @return [ true, false ] Whether the stream is closed.
          #
          # @since 2.1.0
          def closed?
            !@open
          end

          # Abort the upload by deleting all chunks already inserted.
          #
          # @example Abort the write operation.
          #   stream.abort
          #
          # @return [ true ] True if the operation was aborted and the stream is closed.
          #
          # @since 2.1.0
          def abort
            fs.chunks_collection.find({ :files_id => file_id }, @options).delete_many
            (@open = false) || true
          end

          private

          def chunks_collection
            with_write_concern(fs.chunks_collection)
          end

          def files_collection
            with_write_concern(fs.files_collection)
          end

          def with_write_concern(collection)
            if write_concern.nil? || (collection.write_concern &&
              collection.write_concern.options == write_concern.options)
            then
              collection
            else
              collection.with(write: write_concern.options)
            end
          end

          def update_length
            file_info.document[:length] = @length
          end

          def file_info
            doc = { length: @length, _id: file_id, filename: filename }
            @file_info ||= File::Info.new(options.merge(doc))
          end

          def ensure_indexes!
            fs.send(:ensure_indexes!)
          end

          def ensure_open!
            raise Error::ClosedStream.new if closed?
          end
        end
      end
    end
  end
end