fog/fog-azure-rm

View on GitHub
lib/fog/azurerm/requests/storage/multipart_save_block_blob.rb

Summary

Maintainability
A
3 hrs
Test Coverage
F
25%
module Fog
  module Storage
    class AzureRM
      # This class provides the actual implementation for service calls.
      class Real
        # This class is used to store chunk data for block blob before uploading.
        class BlockChunk
          attr_reader :id # For debug
          attr_reader :block_id
          attr_reader :data

          def initialize(id, block_id, data)
            @id = id
            @block_id = block_id
            @data = data
          end
        end

        # This class is a stream to read chunk data.
        class BlockFileStream
          attr_reader :blocks

          def initialize(body)
            if body.respond_to?(:read)
              if body.respond_to?(:rewind)
                begin
                  body.rewind
                rescue => ex
                  Fog::Logger.debug "multipart_save_block_blob - body responds to :rewind but throws an exception when calling :rewind: #{ex.inspect}"
                end
              end
              @stream = body
            else
              @stream = StringIO.new(body)
            end
            @mutex = Mutex.new
            @blocks = []
          end

          def read(size)
            block_id = Base64.strict_encode64(random_string(32))
            data = nil
            id = 0
            @mutex.synchronize do
              data = @stream.read(size)
              return nil if data.nil?
              @blocks << [block_id]
              id = @blocks.size
            end
            BlockChunk.new(id, block_id, data)
          end
        end

        def multipart_save_block_blob(container_name, blob_name, body, options)
          threads_num = options.delete(:worker_thread_num)
          threads_num = UPLOAD_BLOB_WORKER_THREAD_COUNT if threads_num.nil? || !threads_num.is_a?(Integer) || threads_num < 1

          begin
            # Initiate the upload
            Fog::Logger.debug "Creating the block blob #{container_name}/#{blob_name}. options: #{options}"
            content_md5 = options.delete(:content_md5)
            create_block_blob(container_name, blob_name, nil, options)

            # Uploading parts
            Fog::Logger.debug "Starting to upload parts for the block blob #{container_name}/#{blob_name}."
            iostream = BlockFileStream.new(body)

            threads = []
            threads_num.times do |id|
              thread = Thread.new do
                Fog::Logger.debug "Created upload thread #{id}."
                while (chunk = iostream.read(MAXIMUM_CHUNK_SIZE))
                  Fog::Logger.debug "Upload thread #{id} is uploading #{chunk.id}, size: #{chunk.data.size}, options: #{options}."
                  put_blob_block(container_name, blob_name, chunk.block_id, chunk.data, options)
                end
                Fog::Logger.debug "Upload thread #{id} finished."
              end
              thread.abort_on_exception = true
              threads << thread
            end

            threads.each(&:join)
            # Complete the upload
            options[:content_md5] = content_md5 unless content_md5.nil?
            Fog::Logger.debug "Commiting the block blob #{container_name}/#{blob_name}. options: #{options}"
            commit_blob_blocks(container_name, blob_name, iostream.blocks, options)
          rescue
            # Abort the upload & reraise
            begin
              delete_blob(container_name, blob_name)
            rescue => ex
              Fog::Logger.debug "Cannot delete the blob: #{container_name}/#{blob_name} after multipart_save_block_blob failed. #{ex.inspect}"
            end
            raise
          end

          Fog::Logger.debug "Successfully save the block blob: #{container_name}/#{blob_name}."
          true
        end
      end

      # This class provides the mock implementation for unit tests.
      class Mock
        def multipart_save_block_blob(*)
          true
        end
      end
    end
  end
end