openSNP/snpr

View on GitHub
app/workers/parsing.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true
class Parsing
  include Sidekiq::Worker
  sidekiq_options queue: :user_snps, retry: 5, unique: true

  attr_reader :genotype, :temp_table_name, :normalized_csv, :stats, :start_time

  def perform(genotype_id)
    @stats = {}
    @start_time = Time.current
    @genotype = Genotype.find(genotype_id)
    logger.info("Started parsing #{genotype.filetype} genotype with id #{genotype_id}")
    stats[:filetype] = genotype.filetype
    stats[:genotype_id] = genotype.id
    @temp_table_name = "user_snps_temp_#{genotype.id}"

    send_logged(:normalize_csv)
    ActiveRecord::Base.transaction do
      send_logged(:create_temp_table)
      send_logged(:copy_csv_into_temp_table)
      send_logged(:insert_into_snps)
      send_logged(:insert_into_user_snps)
    end
    send_logged(:notify_user)

    stats[:duration] = "#{(Time.current - start_time).round(3)}s"
    logger.info("Finished parsing: #{stats.to_a.map { |s| s.join('=') }.join(', ')}")
  rescue => e
    logger.error("Failed with #{e.class}: #{e.message}")
    raise
  end

  def create_temp_table
    execute(<<-SQL)
      CREATE TEMPORARY TABLE #{temp_table_name} (
        snp_name varchar(32),
        chromosome varchar(32),
        position varchar(32),
        local_genotype char(2)
      ) ON COMMIT DROP
    SQL
  end

  def normalize_csv
    rows = File.readlines(genotype.genotype.path)
      .reject { |line| line.start_with?('#') } # Skip comments
    stats[:rows_without_comments] = rows.length
    csv = send(:"parse_#{genotype.filetype.gsub('-', '_').downcase}", rows)
    known_chromosomes = ['MT', 'X', 'Y', (1..22).map(&:to_s)].flatten
    csv.select! do |row|
      # snp name
      row[0].present? &&
      # chromosome
      known_chromosomes.include?(row[1]) &&
      # position
      row[2].to_i >= 1 && row[2].to_i <= 249_250_621 &&
      # local genotype
      row[3].is_a?(String) && (1..2).include?(row[3].length)
    end
    @normalized_csv = csv.map { |row| row.join(',') }.join("\n")
    stats[:rows_after_parsing] = csv.length
  end

  def copy_csv_into_temp_table
    sql = <<-SQL
      COPY #{temp_table_name} (
        snp_name,
        chromosome,
        position,
        local_genotype
      )
      FROM STDIN
      WITH (FORMAT CSV, HEADER FALSE, DELIMITER ',')
    SQL

    raw = connection.raw_connection
    raw.copy_data(sql) do
      raw.put_copy_data(normalized_csv)
    end
  end

  def insert_into_snps
    time = Time.now.utc.iso8601

    snps = execute(<<-SQL)
      select
        #{temp_table_name}.snp_name as name,
        #{temp_table_name}.chromosome,
        #{temp_table_name}.position,
        1 as user_snps_count
      from #{temp_table_name}
      left join snps on #{temp_table_name}.snp_name = snps.name
      where snps.name is null
    SQL
    Snp.create!(snps.to_a)
  end

  def insert_into_user_snps
    execute("SELECT upsert_user_snps(#{genotype.id})")
  end

  def parse_genes_for_good(rows)
    rows.map do |row|
      fields = row.strip.split("\t")
      [
        fields[0],
        fields[1],
        fields[2],
        fields[3].to_s.rstrip
      ]
    end
  end

  def parse_23andme(rows)
    rows.map do |row|
      fields = row.strip.split("\t")
      [
        fields[0],
        fields[1],
        fields[2],
        fields[3].to_s.rstrip
      ]
    end
  end

  def parse_23andme_exome_vcf(rows)
    # Rules:
    # Skip lines with IndelType in them
    # Skip lines were SNP name is '.', these are non-standard SNPs
    rows.map do |row|
      next if row.include? 'IndelType'
      fields = row.strip.split("\t")
      next if fields[2] == '.'
      major_allele = fields[3] # C
      minor_allele = fields[4] # A
      trans_dict = {"0" => major_allele, "1" => minor_allele}
      names = fields[-1].split(":")[0].split("/") # ["0", "1"], meaning A/C
      alleles = names.map{ |a| trans_dict[a]}.sort.join # becomes AC
      [
        fields[2],
        fields[0],
        fields[1],
        alleles
      ]
    end.compact # because the above next introduces nil.
    # Slower alternative is to use reject first, but then we'll iterate > 2 times
  end

  def parse_decodeme(rows)
    rows.shift if rows.first.start_with?('Name')
    rows.map do |row|
      fields = row.strip.split(',')
      [
        fields[0],
        fields[2],
        fields[3],
        fields[5]
      ]
    end
  end

  def parse_ancestry(rows)
    rows.shift if rows.first.start_with?('rsid')
    rows.map do |row|
      fields = row.strip.split("\t")
      [
        fields[0],
        fields[1],
        fields[2],
        "#{fields[3]}#{fields[4]}"
      ]
    end
  end

  def parse_ftdna_illumina(rows)
    rows.shift if rows.first.start_with?('RSID')
    rows.map do |row|
      fields = row.strip.split(',')
      [
        fields[0].to_s.gsub('"', ''),
        fields[1].to_s.gsub('"', ''),
        fields[2].to_s.gsub('"', ''),
        fields[3].to_s.gsub('"', '')
      ]
    end
  end

  def parse_iyg(rows)
    db_snp_names = {
      "MT-T3027C" => "rs199838004", "MT-T4336C" => "rs41456348",
      "MT-G4580A" => "rs28357975", "MT-T5004C" => "rs41419549",
      "MT-C5178a" => "rs28357984", "MT-A5390G" => "rs41333444",
      "MT-C6371T" => "rs41366755", "MT-G8697A" => "rs28358886",
      "MT-G9477A" => "rs2853825", "MT-G10310A" => "rs41467651",
      "MT-A10550G" => "rs28358280", "MT-C10873T" => "rs2857284",
      "MT-C11332T" => "rs55714831", "MT-A11947G" => "rs28359168",
      "MT-A12308G" => "rs2853498", "MT-A12612G" => "rs28359172",
      "MT-T14318C" => "rs28357675", "MT-T14766C" => "rs3135031",
      "MT-T14783C" => "rs28357680"
    }
    rows.map do |row|
      snp_name, local_genotype = row.strip.split("\t")
      if snp_name.start_with?('MT')
        position = snp_name[/[0-9]+/]
        chromosome = 'MT'
      else
        position = chromosome = '1'
      end
      [
        db_snp_names.fetch(snp_name, snp_name),
        chromosome,
        position,
        local_genotype.strip
      ]
    end
  end

  def notify_user
    UserMailer.finished_parsing(genotype.id, stats).deliver_later
  end

  def execute(sql)
    connection.execute(sql)
  end

  def connection
    ActiveRecord::Base.connection
  end

  def send_logged(method)
    start_time = Time.now
    ret = send(method)
    took = Time.now - start_time
    logger.info("calling of method `#{method}` took #{took} s")
    ret
  end
end