lib/gamtools/qc/fastqc.py
"""
=================
The qc.fastqc module
=================
The qc.fastqc module contains functions for parsing fastqc output files.
Code in this module was shamelessly stolen from
https://code.google.com/p/bioinformatics-misc/source/browse/trunk/fastqc_to_pgtable.py?spec=svn93&r=93
"""
import os
import numpy as np
import pandas as pd
def fastqc_data_file(input_fastq):
"""Given an input fastq file, return the name fastqc will use
for it's output file.
:param str input_fastq: Path to a fastq file.
:returns: Path to fastqc output files.
"""
base_folder = input_fastq.split('.')[0]
#base_folder = '.'.join(input_fastq.split('.')[:-1])
fastqc_folder = base_folder + '_fastqc'
return os.path.join(fastqc_folder, 'fastqc_data.txt')
def parse_module(fastqc_module):
"""
Parse a fastqc module from the table format to a line format (list).
Input is list containing the module. One list-item per line. E.g.:
fastqc_module= [
'>>Per base sequence quality pass',
'#Base Mean Median Lower Quartile Upper Quartile 10th Percentile 90th Percentile',
'1 36.34 38.0 35.0 39.0 31.0 40.0',
'2 35.64 38.0 35.0 39.0 28.0 40.0',
'3 35.50 38.0 34.0 39.0 28.0 40.0',
...
]
Return a list like this where each sublist after 1st is a column:
['pass', ['1', '2', '3', ...], ['36,34', '35.64', '35.50', ...], ['40.0', '40.0', '40.0', ...]]
"""
row_list = []
module_header = fastqc_module[0]
module_name = module_header.split('\t')[0]
# Line with module name >>Per base ... pass/warn/fail
row_list.append(module_header.split('\t')[1])
# Handle odd cases:
# Where no table is returned:
if len(fastqc_module) == 1 and module_name == '>>Overrepresented sequences':
return row_list + [[]] * 4
if len(fastqc_module) == 1 and module_name == '>>Kmer Content':
return row_list + [[]] * 5
# Table is not the secod row:
if module_name == '>>Sequence Duplication Levels':
tot_dupl = fastqc_module[1].split('\t')[1]
row_list.append(tot_dupl)
del fastqc_module[1]
# Conevrt table to list of lists:
tbl = []
for line in fastqc_module[2:]:
tbl.append(line.split('\t'))
# Put each column in a list:
nrows = len(tbl)
ncols = len(tbl[0])
for i in range(0, ncols):
col = []
for j in range(0, nrows):
col.append(tbl[j][i])
row_list.append(col)
return row_list
def is_mono_repeat(kmer):
"""
Return true if kmer represents a mono-nucleotide repeat (e.g. AAAA).
"""
return bool(len(set(kmer)) == 1)
def is_di_repeat(kmer):
"""
Return true if kmer represents a di-nucleotide repeat (e.g. ATATAT).
"""
if not len(set(kmer)) == 2:
return False
return bool(len(set(kmer[::2])) == 1 and len(set(kmer[1::2])) == 1)
def get_kmer_summary(module):
"""
Calculate the total number of kmers that are either mono- or di- nucleotide
repeats.
"""
kmer_data = parse_module(module)
kmers = kmer_data[1]
counts = list(map(float, kmer_data[3]))
summary_data = {'dinucleotide_repeats': 0,
'mononucleotide_repeats': 0}
for kmer, count in zip(kmers, counts):
if is_mono_repeat(kmer):
summary_data['mononucleotide_repeats'] += count
elif is_di_repeat(kmer):
summary_data['dinucleotide_repeats'] += count
return summary_data
def get_avg_qual(module):
"""
Get the average per-base-pair sequencing quality score.
"""
qual_data = parse_module(module)
qualities, counts = np.array(
list(map(int, qual_data[1]))), np.array(list(map(float, qual_data[2])))
avg = (qualities * counts).sum() / counts.sum()
return {'avg_quality': avg}
def get_sample(filename):
"""
Get the name of the input sample given the fastqc output file path.
"""
return os.path.basename(os.path.dirname(filename))[:-7]
def process_file(filename):
"""
Process a fastqc output file and calculate some summary statistics.
"""
fq_lines = open(filename).readlines()
fq_lines = [x.strip() for x in fq_lines]
fastqc_dict = {}
# Get start and end position of all modules:
mod_start = []
mod_end = []
for i, line in enumerate(fq_lines):
if line == '>>END_MODULE':
mod_end.append(i)
elif line.startswith('>>'):
mod_start.append(i)
else:
pass
# Start processing modules. First one (Basic statitics) is apart:
for start, end in zip(mod_start[1:], mod_end[1:]):
module = fq_lines[start:end]
module_name = module[0].split('\t')[0]
if module_name == '>>Kmer Content':
fastqc_dict.update(get_kmer_summary(module))
if module_name == '>>Per sequence quality scores':
fastqc_dict.update(get_avg_qual(module))
fastqc_dict['Sample'] = get_sample(filename)
return fastqc_dict
def get_quality_stats(input_fastqc_files):
"""
Iterate over a list of fastqc output files and generate a dataframe
containing summary statistics for each file.
"""
sample_qualities = []
for filename in input_fastqc_files:
sample_qualities.append(process_file(filename))
return pd.DataFrame(sample_qualities)
def write_quality_stats(input_files, output_file):
"""
Iterate over a list of fastqc output files and generate a dataframe
containing summary statistics for each file, then write the result
to disk.
"""
quality_df = get_quality_stats(input_files)
quality_df.to_csv(output_file, sep='\t', index=False)
def quality_qc_from_doit(dependencies, targets):
"""
Wrapper function to call write_quality_stats from argparse.
"""
assert len(targets) == 1
write_quality_stats(dependencies, targets[0])