bio-miga/miga

View on GitHub
lib/miga/cli/action/doctor/distances.rb

Summary

Maintainability
C
1 day
Test Coverage

module MiGA::Cli::Action::Doctor::Distances
  ##
  # Perform databases operation with MiGA::Cli +cli+
  def check_db(cli)
    cli.say 'Checking integrity of databases'
    p = cli.load_project
    n = p.dataset_names.size
    (0 .. cli[:threads] - 1).map do |i|
      Process.fork do
        k = 0
        p.each_dataset do |d|
          k += 1
          cli.advance('Datasets:', k, n, false) if i == 0
          next unless k % cli[:threads] == i
          each_database_file(d) do |db_file, metric, result, _rank|
            check_sqlite3_database(db_file, metric) do
              cli.say(
                "  > Removing malformed database from #{d.name}:#{result}   "
              )
              File.unlink(db_file)
              r = d.result(result) or next
              [r.path(:done), r.path].each do |f|
                File.unlink(f) if File.exist?(f)
              end
            end
          end
        end
      end
    end
    Process.waitall
    cli.say
  end

  ##
  # Perform bidirectional operation with MiGA::Cli +cli+
  def check_bidir(cli)
    cli.say 'Checking if reference distances are bidirectional'
    project = cli.load_project
    ref_ds = project.each_dataset.select(&:ref?)

    # Read and merge data
    tmp = partial_bidir_tmp(project, ref_ds)
    dist = merge_bidir_tmp(tmp)
    FileUtils.rm_rf(tmp)

    # Write missing values (threaded)
    MiGA::Parallel.distribute(ref_ds, cli[:threads]) do |ds, idx, thr|
      cli.advance('Datasets:', idx + 1, ref_ds.size, false) if thr == 0
      save_bidirectional(ds, dist)
    end
    cli.say
  end

  ##
  # Perform distances operation with MiGA::Cli +cli+
  def check_dist(cli)
    p = cli.load_project
    %i[ani aai].each do |dist|
      res = p.result("#{dist}_distances")
      next if res.nil?

      cli.say "Checking #{dist} table for consistent datasets"
      notok, fix = check_dist_eval(cli, p, res)
      check_dist_fix(cli, p, fix)
      check_dist_recompute(cli, res, notok)
    end
  end

  #---- Auxuliary functions -----

  ##
  # Make a temporal directory holding partial bidirectionality reports (one per
  # thread) in a custom multi-JSON format. Requires a MiGA::Project +project+
  # and the iterator of the reference datasets +ref_ds+. Returns the path to the
  # temporal directory created. Used by +check_bidir+
  def partial_bidir_tmp(project, ref_ds)
    n = ref_ds.size

    # Read data first (threaded)
    tmp = File.join(project.path, 'doctor-bidirectional.tmp')
    FileUtils.mkdir_p(tmp)
    MiGA::Parallel.process(cli[:threads]) do |thr|
      file = File.join(tmp, "#{thr}.json")
      fh = File.open(file, 'w')
      [:aai, :ani].each do |metric|
        fh.puts "# #{metric}"
        ref_ds.each_with_index do |ds, idx|
          if idx % cli[:threads] == thr
            cli.advance('Reading:', idx + 1, n, false) if thr == 0
            row = read_bidirectional(ds, metric)
            fh.puts "#{ds.name} #{JSON.fast_generate(row)}" unless row.empty?
          end
        end
      end
      fh.puts '# end'
      fh.flush # necessary for large threaded runs
      fh.close
      if thr == 0
        cli.advance('Reading:', n, n, false)
        cli.say
      end
    end

    return tmp
  end

  ##
  # Read partial temporal reports of bidirectionality (located in +tmp+), and
  # return a two-deep hash with the final missingness report by metric (first
  # key) and dataset name (second key). Used by +check_bidir+
  def merge_bidir_tmp(tmp)
    dist = { aai: {}, ani: {} }
    cli[:threads].times do |i|
      cli.advance('Merging:', i + 1, cli[:threads], false)
      file = File.join(tmp, "#{i}.json")
      File.open(file, 'r') do |fh|
        metric = nil
        fh.each do |ln|
          qry, row = ln.chomp.split(' ', 2)
          row or raise "Unexpected format in #{file}:#{$.}"
          if qry == '#'
            metric = row.to_sym
          else
            raise "Unrecognized metric: #{metric}" unless dist[metric]
            JSON.parse(row).each do |sbj, val|
              dist[metric][qry] ||= {}
              if dist[metric][sbj]&.include?(qry)
                dist[metric][sbj].delete(qry) # Already bidirectional
              else
                dist[metric][qry][sbj] = val
              end
            end
          end
        end
        raise "Incomplete thread dump: #{file}" unless metric == :end
      end
    end
    cli.say

    return dist
  end
end