ManageIQ/manageiq-gems-pending

View on GitHub
lib/gems/pending/util/miq_file_storage.rb

Summary

Maintainability
A
25 mins
Test Coverage
A
100%
# This class is meant to be a abstract interface for defining a file_storage
# class.
#
# The storage class can either be of a type of "object storage", which includes:
# * protocols like FTP
# * document storage like s3 and OpenStack's Swift
#
# And mountable filesystems like:
# * NFS
# * SMB
#
# The class is meant to allow a shared interface for working with these
# different forms of file storage, while maintaining their differences in
# implementation where necessary.  Connection will be handled separately by the
# subclasses, but they must conform to the top level interface.
#

class MiqFileStorage
  class InvalidSchemeError < ArgumentError
    def initialize(bad_scheme = nil)
      super(error_message(bad_scheme))
    end

    def error_message(bad_scheme)
      valid_schemes = ::MiqFileStorage.storage_interface_classes.keys.inspect
      "#{bad_scheme} is not a valid MiqFileStorage uri scheme. Accepted schemes are #{valid_schemes}"
    end
  end

  def self.with_interface_class(opts)
    klass = fetch_interface_class(opts)
    block_given? ? yield(klass) : klass
  end

  def self.fetch_interface_class(opts)
    return nil unless opts[:uri]

    require 'uri'
    scheme, _ = URI.split(URI::DEFAULT_PARSER.escape(opts[:uri]))
    klass = storage_interface_classes[scheme]

    raise InvalidSchemeError, scheme if klass.nil?

    klass.new_with_opts(opts)
  end
  private_class_method :fetch_interface_class

  def self.storage_interface_classes
    # TODO: this method expects these classes to be eager loaded.
    # Once we eager load, we can remove these.
    ::MiqGlusterfsSession
    ::MiqLocalMountSession
    ::MiqNfsSession
    ::MiqSmbSession

    @storage_interface_classes ||= Interface.descendants.each_with_object({}) do |klass, memo|
      memo[klass.uri_scheme] = klass if klass.uri_scheme
    end
  end

  class Interface
    BYTE_HASH_MATCH = /^(?<BYTE_NUM>\d+(\.\d+)?)\s*(?<BYTE_QUALIFIER>K|M|G)?$/i
    BYTE_HASH       = {
      "k" => 1_024,
      "m" => 1_048_576,
      "g" => 1_073_741_824
    }.freeze

    attr_reader :remote_file_path, :byte_count, :source_input, :input_writer

    def self.new_with_opts(opts) # rubocop:disable Lint/UnusedMethodArgument
      raise NotImplementedError, "#{name}.new_with_opts is not defined"
    end

    def self.uri_scheme
      nil
    end

    # :call-seq:
    #   add( remote_uri )                       { |input_writer| ... }
    #   add( remote_uri, byte_count )           { |input_writer| ... }
    #
    #   add( local_io, remote_uri )
    #   add( local_io, remote_uri, byte_count )
    #
    # Add a file to the destination URI.
    #
    # In the block form of the method, only the remote_uri is required, and it
    # is assumed the input will be a generated in the executed block (most
    # likely an external process) to a unix pipe that can be written to.  The
    # pipe generated by this method and passed in to the block as a file
    # location to the `input_stream`).
    #
    # In the non-block form, a source must be provided as the first argument
    # either as an IO object that can be read from, or a file path, and the
    # second argument is the remote_uri as in the block form.
    #
    # An additional argument in both forms as the last argument is `byte_count`
    # can also be included.  If passed, it will be assumed that the resulting
    # input will be split, and the naming for the splits will be:
    #
    #   - filename.00001
    #   - filename.00002
    #   ...
    #
    # Block form:
    #
    #   nfs_session.add("path/to/file", "200M") do |input_stream|
    #     `pg_dump -f #{input_stream} vmdb_production`
    #   end
    #
    # Non-block form:
    #
    #   nfs_session.add("path/to/local_file", "path/to/remote_file")
    #   nfs_session.add("path/to/local_file", "path/to/remote_file", "200M")
    #
    def add(*upload_args, &block)
      initialize_upload_vars(*upload_args)
      mkdir(File.dirname(@remote_file_path))
      thread = handle_io_block(&block)
      result = if byte_count
                 upload_splits
               else
                 upload_single(@remote_file_path)
               end
      # `.join` will raise any errors from the thread, so we want to do that
      # here (if a thread exists of course).
      thread.join if thread
      result
    ensure
      reset_vars
    end
    alias upload add

    def mkdir(dir) # rubocop:disable Lint/UnusedMethodArgument
      raise NotImplementedError, "#{self.class}##{__callee__} is not defined"
    end

    # :call-seq:
    #   download( local_io, remote_uri )
    #   download( nil,      remote_uri ) { |input_writer| ... }
    #
    # Download a file from a remote uri.
    #
    # In non-block form, the remote_uri is saved to the local_io.
    #
    # In block form, the local_io is omitted, and it is set to a PTY writer
    # path that will assumed to be read by the block provided.
    def download(local_file, remote_file_uri, &block)
      @remote_file_path = remote_file_uri
      if block_given?
        thread = handle_io_block(&block)
        download_single(remote_file_uri, input_writer)
        input_writer.close
        thread.join
      else
        download_single(remote_file_uri, local_file)
      end
    ensure
      reset_vars
    end

    # :call-seq:
    #   magic_number_for( remote_uri )
    #   magic_number_for( remote_uri, {:accepted => {:key => "magic_str", ...} } )
    #
    # Determine a magic number for a remote file.
    #
    # If no options[:accepted] is passed, then only the first 256 bytes of the
    # file are downloaded, and just that data is returned.
    #
    # If a hash of magic number keys and values for those magic numbers is
    # passed, then it will download the largest byte size for the magic number
    # values, and compare against the list, returning the first match.
    #
    # Example:
    #
    #   magics = { :pgdump => PostgresAdmin::PG_DUMP_MAGIC }
    #
    #   magic_number_for("example.org/my_dump.gz", :accepted => magics)
    #   #=> :pgdump
    #   magic_number_for("example.org/my_file.rb", :accepted => magics)
    #   #=> nil
    #
    # NOTE:  This is an extremely niave implementation for remote magic number
    # checking, and is only really meant for working with the known magics
    # PostgresAdmin.  Many other use cases would need to be considered, since
    # magic numbers can also checked against the tail of the file, and are not
    # limited to the first 256 bytes as has been arbitrarily decided on here.
    def magic_number_for(uri, options = {})
      # Amount of bytes to download for checking magic
      @byte_count = options.fetch(:accepted, {}).values.map(&:length).max || 256
      uri_data_io = StringIO.new
      download_single(uri, uri_data_io)
      uri_data    = uri_data_io.string

      if (magics = options[:accepted])
        result = magics.detect { |_, magic| uri_data.force_encoding(magic.encoding).start_with?(magic) }
        result && result.first
      else
        uri_data
      end
    ensure
      reset_vars
    end

    private

    # NOTE:  Needs to be overwritten in the subclass!
    #
    # Classes that inherit from `MiqFileStorage` need to make sure to create a
    # method that overwrites this one to handle the specifics of uploading for
    # their particular ObjectStore protocol or MountSession.
    #
    # `dest_uri` is the current file that will be uploaded.  If file splitting
    # is occurring, this will update the filename passed into `.add` to include
    # a `.0000X` suffix, where the suffix is padded up to 5 digits in total.
    #
    # `#upload_single` doesn't need to worry about determining the file name
    # itself for splitting, but if any relative path munging is necessary, that
    # should be done here (see `MiqGenericMountSession#upload_single` for an
    # example)
    #
    # `source_input` available as an attr_reader in this method, and will
    # always be a local IO object that is available for reading.
    #
    # `byte_count` is also an attr_reader that is available, and will either be
    # `nil` if no file splitting is occurring, or a integer representing the
    # maximum number of bytes to uploaded for this particular `dest_uri`.
    #
    #
    # Ideally, making use of `IO.copy_stream` will simplify this process
    # significantly, as you can pass it `source_input`, `dest_uri`, and
    # `byte_count` respectively, and it will automatically handle streaming the
    # data from one IO object to the other.  In mount based situations, where
    # `dest_uri` is a file path (in `MiqGenericMountSession#upload_single`,
    # this is converted to `relpath`), this does not need to be converted to a
    # `File` IO object as `IO.copy_stream` will do that for you.
    def upload_single(dest_uri) # rubocop:disable Lint/UnusedMethodArgument
      raise NotImplementedError, "#{self.class}#upload_single is not defined"
    end

    def upload_splits
      @position = 0
      until source_input.eof?
        upload_single(next_split_filename)
        @position += byte_count
      end
    end

    def initialize_upload_vars(*upload_args)
      upload_args.pop if (@byte_count = parse_byte_value(upload_args.last))
      @remote_file_path = upload_args.pop

      unless upload_args.empty?
        source        = upload_args.pop
        @source_input = source.kind_of?(IO) ? source : File.open(source, "r")
      end
    end

    def parse_byte_value(bytes)
      match = bytes.to_s.match(BYTE_HASH_MATCH) || return

      bytes = match[:BYTE_NUM].to_f
      if match[:BYTE_QUALIFIER]
        bytes *= BYTE_HASH[match[:BYTE_QUALIFIER].downcase]
      end
      bytes.to_i
    end

    def handle_io_block
      if block_given?
        require "tmpdir"

        # create pathname, but don't create the file for it (next line)
        fifo_path = Pathname.new(Dir::Tmpname.create("") {})
        File.mkfifo(fifo_path)

        # For #Reasons(TM), the reader must be opened first
        @source_input = File.open(fifo_path.to_s, IO::RDONLY | IO::NONBLOCK)
        @input_writer = File.open(fifo_path.to_s, IO::WRONLY | IO::NONBLOCK)

        Thread.new do
          begin
            yield fifo_path      # send the path to the block to get executed
          ensure
            @input_writer.close  # close the file so we know we hit EOF (for #add)
          end
        end
      end
    end

    def reset_vars
      File.delete(@input_writer.path) if @input_writer
      @position, @byte_count, @remote_file_path, @source_input, @input_writer = nil
    end

    def next_split_filename
      "#{remote_file_path}.#{'%05d' % (@position / byte_count + 1)}"
    end

    def download_single(source, destination) # rubocop:disable Lint/UnusedMethodArgument
      raise NotImplementedError, "#{self.class}#download_single is not defined"
    end
  end
end