pombo-lab/gamtools

View on GitHub
lib/gamtools/pipeline.py

Summary

Maintainability
F
3 days
Test Coverage
"""
===================
The pipeline module
===================

The pipeline module handles the :docs:`tools/process_nps` command.

The process_nps command uses doit_ as a task engine, to determine
what actions (or tasks) need to be done and in what order. Since
the pipeline is part of a larger command line program, it needs some
way to take arguments/parameters outside of the normal doit framework.
This functionality is provided by wrapit_ - which is an extension of
doit that allows additional parameters to be passed to tasks.

:class:`InputFileMappingTasks` is a class that saves the input
parameters and uses them to create tasks. Any method of the class
that begins with `task_` is considered a task and will be passed
to doit.

.. _doit: http://pydoit.org/

"""

import os
import sys
import subprocess
import re
import itertools
import inspect

from distutils.version import LooseVersion #pylint: disable=no-name-in-module,import-error
from wrapit.api import run

from . import call_windows, segregation, qc, select_samples, cosegregation, utils


def swap_extension(input_path, extension):
    """Replace the file extension of a file path.

    :param str input_path: Input file path
    :param str extension: New extension to use
    :returns: Input file path with the extension replaced.
    :rtype: str
    """

    output_dir = os.path.dirname(input_path)
    basename, old_ext = os.path.splitext(os.path.basename(input_path))
    if old_ext == '.gz':
        basename, old_ext = os.path.splitext(basename)
    return os.path.join(output_dir, basename + extension)


def get_middle_value(my_list):
    """Return the middle value from a list after sorting.

    :param list my_list: List of sortable values"""

    return sorted(my_list)[len(my_list) // 2]


def pretty_resolution(window_size):
    """Convert an integer resolution in base-pairs to a nicely formatted string.

    :param int window_size: Integer resolution in base pairs.
    :returns: Formatted resolution."""

    return utils.format_genomic_distance(window_size, precision=0)


def bp_coverage_path(base_folder, window_size):
    """Get the path to a bp coverage table given a base folder and resolution.

    :param str base_folder: Path to the folder containing the bp coverage table.
    :param int window_size: Resolution in base pairs
    :returns: Path to bp coverage table
    """

    return os.path.join(
        base_folder,
        'bp_coverage_at_{0}.table'.format(
            pretty_resolution(window_size)))


def individual_bp_coverage_path(window_size, bam_path):
    """Get the path to a bp coverage table for one specific bam

    :param int window_size: Resolution in base pairs
    :param str bam_path: Path to input bam for bp coverage calculation
    :returns: Path to bp coverage table
    """

    new_suffix = '.bp_at_{}.bed'.format(
        pretty_resolution(window_size))
    return swap_extension(bam_path, new_suffix)


def segregation_path(base_folder, window_size):
    """Get the path to a segregation table given a base folder and resolution.

    :param str base_folder: Path to the folder containing the segregation table.
    :param int window_size: Resolution in base pairs
    :returns: Path to segregation table
    """

    return os.path.join(
        base_folder,
        'segregation_at_{0}.table'.format(
            pretty_resolution(window_size)))


def get_samtools_version():
    """Get the version number for the installed version of samtools"""

    try:

        with subprocess.Popen('samtools',
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE) as proc:

            output, err = proc.communicate() #pylint: disable=unused-variable
            proc.wait()

    except OSError:
        sys.exit('samtools is either not installed or not present in $PATH. '
                 'Please install samtools to continue.')

    vnum_regexp = b'^Version:\s(?P<version>(\d+\.){1,2}(\d+))'

    regexp_matches = re.search(vnum_regexp, err, re.MULTILINE)

    return regexp_matches.group('version').decode('UTF-8')


def get_samtools_sort_actions():
    """Return the appropriate doit function for using the installed version of samtools sort"""

    samtools_vnum = get_samtools_version()
    samtools_vnum = LooseVersion(samtools_vnum)

    if samtools_vnum < LooseVersion('0.1.19'):
        samtools_actions = ['samtools sort %(dependencies)s %(targets)s.tmp',
                            'mv %(targets)s.tmp.bam %(targets)s']

    elif samtools_vnum < LooseVersion('1.0'):
        samtools_actions = ['samtools sort -f %(dependencies)s %(targets)s']

    else:
        samtools_actions = [
            'samtools sort -o %(targets)s -T %(targets)s.tmp %(dependencies)s']

    return samtools_actions


class InputFileMappingTasks():
    """Class for generating doit tasks from command-line arguments.

    GAMtools "process_nps" command generates a set of doit tasks at
    runtime based on a set of parameters passed via the command-line.
    These parameters are handled by argparse (see main.py for the
    parser definitions), yielding an object called args. Tasks
    need access to args at runtime, so they must be generated
    via a class, such that args is always accessible via self.args.

    :param args: An argparse Namespace object containing parameters \
            passed via the command line
    """

    def __init__(self, args):
        self.args = args

    def create_doit_tasks(self):
        """Generator function that yields doit tasks."""

        tasks = []
        task_generators = []

        for possible_task in dir(self):

            # Only attributes starting with task_ are task generators
            if possible_task.startswith('task_'):

                # Call each function starting with task_
                task_obj = getattr(self, possible_task)()

                # We cannot mix generators and task dictionaries
                # in the same iterable
                if inspect.isgenerator(task_obj):
                    task_generators.append(task_obj)
                else:
                    tasks.append(task_obj)

        # Aggregate task dictionaries into a generator and add them
        # to task_generators
        task_generators.append((t for t in tasks))

        # Turn the list of generators into a single iterable
        all_tasks = itertools.chain.from_iterable(task_generators)

        for task in all_tasks:
            yield task

    def task_map_fastq(self):
        """Task to map fastq files using bowtie."""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Mapping fastq',
                "actions": ['bowtie2 -x genome -U %(dependencies)s | sed \'/XS : /d\' | '
                            'samtools view -q %(minimum_mapq)s -F 4 -bS - > %(targets)s'],
                "targets": [swap_extension(input_fastq, ".bam")],
                "file_dep": [input_fastq],
            }

    def task_sort_bam(self):
        """Task to sort bam files"""

        sort_actions = get_samtools_sort_actions()

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Sorting bam',
                "actions": sort_actions,
                "targets": [swap_extension(input_fastq, ".sorted.bam")],
                "file_dep": [swap_extension(input_fastq, ".bam")],
            }

    def task_remove_duplicates(self):
        """Task to remove PCR duplicates from sorted bam files"""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Removing duplications',
                "actions": ['samtools rmdup -s %(dependencies)s %(targets)s', ],
                "targets": [swap_extension(input_fastq, ".rmdup.bam")],
                "file_dep": [swap_extension(input_fastq, ".sorted.bam")],
            }

    def task_index_bam(self):
        """Task to generate indexes for sorted bam files"""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Indexing bam',
                "actions": ['samtools index %(dependencies)s', ],
                "targets": [swap_extension(input_fastq, ".rmdup.bam.bai")],
                "file_dep": [swap_extension(input_fastq, ".rmdup.bam")],
            }

    def task_make_bedgraph(self):
        """Task to create bedgraphs from indexed and sorted bam files"""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Converting bam to bedgraph',
                "actions": ['genomeCoverageBed -bg -ibam %(dependencies)s '
                            '-g %(genome_file)s > %(targets)s',
                            'if [ ! -s "%(targets)s" ]; '
                            'then echo "Creating empty bedgraph %(targets)s";'
                            'create_empty_bedgraph %(genome_file)s %(targets)s; fi'],
                "targets": [swap_extension(input_fastq, ".bedgraph")],
                "file_dep": [swap_extension(input_fastq, ".rmdup.bam")],
            }

    def task_make_bigwig(self):
        """Task to create bigwigs from bedgraphs"""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Converting bedgraph to bigwig',
                "actions": ['bedGraphToBigWig %(dependencies)s %(genome_file)s %(targets)s'],
                "targets": [swap_extension(input_fastq, ".bw")],
                "file_dep": [swap_extension(input_fastq, ".bedgraph")],
            }

    def task_make_output_dir(self):
        """Task to create a directory"""

        yield {
            "basename": 'Creating output directory',
            "actions": ['mkdir -p %(targets)s', ],
            "targets": [self.args.output_dir],
            "uptodate": [True],
        }

    def task_get_bp_coverage(self):
        """Task to get read coverage per window for a range of resolutions"""

        for input_fastq in self.args.input_fastqs:
            for window_size in self.args.window_sizes:
                input_bam = swap_extension(input_fastq, ".rmdup.bam")
                output_cov = individual_bp_coverage_path(window_size, input_bam)
                yield {
                    'basename': 'Getting bp coverage',
                    'name': output_cov,
                    'actions': ['echo "chrom start stop %(dependencies)s" > %(targets)s',
                                'bash -i -c "bedtools coverage -b %(dependencies)s '
                                '-a <(bedtools makewindows -w {0} -g %(genome_file)s) '
                                '| cut -d \'\t\' -f 1,2,3,7 '
                                '>> %(targets)s"' .format(window_size)],
                    'targets': [output_cov],
                    'file_dep': [input_bam],
                    'task_dep': ['Indexing bam', 'Creating output directory'],
                }


    def task_merge_bp_coverage(self):
        """Task to merge bp coverage files for individual bams into a bp coverage table"""

        input_bamfiles = [swap_extension(input_fastq, ".rmdup.bam")
                          for input_fastq in self.args.input_fastqs]

        for window_size in self.args.window_sizes:

            input_cov_files = [individual_bp_coverage_path(window_size, input_bam)
                               for input_bam in input_bamfiles]

            yield {
                'basename': 'Merging bp coverage',
                'name': '{0} windows'.format(pretty_resolution(window_size)),
                'actions': [call_windows.merge_reads_coverage],
                'targets': [bp_coverage_path(self.args.output_dir, window_size)],
                'file_dep': input_cov_files,
            }


    def task_get_segregation(self):
        """Task to call positive windows from coverage tables for a range of resolutions"""

        for window_size in self.args.window_sizes:

            input_coverage_file = bp_coverage_path(
                self.args.output_dir, window_size)
            output_segregation_file = segregation_path(
                self.args.output_dir, window_size)

            task = {
                'basename': 'Calling positive windows',
                'name': pretty_resolution(window_size),
                'targets': [output_segregation_file],
                'file_dep': [input_coverage_file],
                'actions': [],
            }

            if self.args.fitting_folder:
                full_fitting_dir = os.path.join(
                    self.args.output_dir, self.args.fitting_folder, '{0}'.format(
                        pretty_resolution(window_size)))
                task['actions'].append(
                    'mkdir -pv {0}'.format(full_fitting_dir))
            else:
                full_fitting_dir = None

            task['actions'].append((call_windows.threshold_file, (
                input_coverage_file,
                output_segregation_file,
                full_fitting_dir,
                self.args.details_file,
                self.args.fitting_function,
                self.args.min_read_threshold)))

            yield task

    def task_get_segregation_bed(self):
        """Task to get bed files of positive windows for each NP at each resolution"""

        for window_size in self.args.window_sizes:

            input_segregation = segregation_path(
                self.args.output_dir, window_size)

            for input_fastq in self.args.input_fastqs:

                output_bed = swap_extension(
                    input_fastq, ".segregation_{0}.bed".format(
                        pretty_resolution(window_size)))

                yield {
                    'basename': 'Getting segregation bed',
                    'name': '{0}, {1}'.format(pretty_resolution(window_size), input_fastq),
                    'actions': [(segregation.sample_segregation_to_bed,
                                 (input_segregation,
                                  swap_extension(input_fastq, '.rmdup.bam'),
                                  output_bed + '.unsorted')),
                                'sort -k1,1 -k2,2n %(targets)s.unsorted > %(targets)s',
                                'rm %(targets)s.unsorted', ],
                    'targets': [output_bed],
                    'file_dep': [segregation_path(self.args.output_dir, window_size)],
                }

    def task_get_segregation_bigbed(self):
        """Task to get bigbed files of positive windows for each NP at each resolution"""

        for window_size in self.args.window_sizes:
            for input_fastq in self.args.input_fastqs:

                yield {
                    'basename': 'Getting segregation bigBed',
                    'name': '{0}, {1}'.format(pretty_resolution(window_size), input_fastq),
                    # If the input bed is empty, bedToBigBed will fail. We can force it not
                    # to return an error code, but we must touch the target first to ensure
                    # it gets created.
                    'actions': ['touch %(targets)s',
                                'bedToBigBed %(dependencies)s %(genome_file)s %(targets)s || '
                                'true', ],
                    'targets': [swap_extension(input_fastq,
                                               ".segregation_{0}.bb".format(
                                                   pretty_resolution(window_size)))],
                    'file_dep': [swap_extension(input_fastq,
                                                ".segregation_{0}.bed".format(
                                                    pretty_resolution(window_size)))],
                }

    def task_run_fastqc(self):
        """Task to run fastqc for each input fastq"""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Running fastqc',
                "actions": ['fastqc -q --extract %(dependencies)s'],
                "targets": [qc.fastqc.fastqc_data_file(input_fastq)],
                "file_dep": [input_fastq],
            }

    def task_run_fastq_screen(self):
        """Task to run fastq_screen for each input fastq"""

        for input_fastq in self.args.input_fastqs:
            yield {
                "name": input_fastq,
                "basename": 'Running fastq_screen',
                "actions": ['fastq_screen --force --quiet %(dependencies)s'],
                "targets": [qc.screen.screen_out_path(input_fastq)],
                "file_dep": [input_fastq],
            }

    def task_extract_contamination(self):
        """Task to summarize results of fastq_screen runs"""

        fastq_screen_files = [qc.screen.screen_out_path(
            input_fastq) for input_fastq in self.args.input_fastqs]
        return {
            'basename': 'Getting contamination stats',
            'actions': [
                qc.screen.contamination_from_doit],
            'targets': [
                os.path.join(
                    self.args.output_dir,
                    'contamination_stats.txt')],
            'file_dep': fastq_screen_files,
        }

    def task_extract_quality(self):
        """Task to summarize results of fastqc runs"""

        fastqc_files = [qc.fastqc.fastqc_data_file(
            input_fastq) for input_fastq in self.args.input_fastqs]
        return {
            'basename': 'Getting quality stats',
            'actions': [
                qc.fastqc.quality_qc_from_doit],
            'targets': [
                os.path.join(
                    self.args.output_dir,
                    'quality_stats.txt')],
            'file_dep': fastqc_files,
        }

    def task_mapping_stats(self):
        """Task to count numbers of sequenced/mapped/duplicated reads"""

        return {
            'basename': 'Getting mapping stats',
            'actions': ['%(mapping_stats_script)s %(dependencies)s > %(targets)s'],
            'targets': [
                os.path.join(
                    self.args.output_dir,
                    'mapping_stats.txt')],
            'file_dep': self.args.input_fastqs,
        }

    def task_segregation_stats(self):
        """Task to calculate summary statistics for positive window calling"""

        if self.args.qc_window_size is None:
            self.args.qc_window_size = get_middle_value(self.args.window_sizes)

        input_segregation = segregation_path(
            self.args.output_dir, self.args.qc_window_size)

        return {
            'basename': 'Getting segregation stats',
            'actions': [
                qc.segregation.get_segregation_stats_doit],
            'targets': [
                os.path.join(
                    self.args.output_dir,
                    'segregation_stats.txt')],
            'file_dep': [input_segregation],
        }

    def task_merge_stats(self):
        """Task to make a single QC statistics table with one row per NP"""

        files_to_merge = [
            os.path.join(
                self.args.output_dir,
                sf) for sf in self.args.default_stats] + self.args.additional_qc_files

        return {
            'basename': 'Merging stats files',
            'actions': [
                qc.merge.merge_stats_from_doit],
            'targets': [
                os.path.join(
                    self.args.output_dir,
                    'merged_stats.txt')],
            'file_dep': files_to_merge}

    def task_copy_qc_parameters(self):
        """Task to make a qc_parameters.cfg file if it doesn't exist"""

        return {
            'basename': 'Creating QC parameters file with default values',
            'actions': ['cp %(example_parameters_file)s %(targets)s'],
            'targets': [
                os.path.join(
                    self.args.output_dir,
                    'qc_parameters.cfg')],
            'uptodate': [True],
        }

    def task_samples_passing_qc(self):
        """Task to identify NPs passing/failing QC"""

        return {
            'basename': 'Finding samples that pass QC',
            'actions': [qc.pass_qc.samples_passing_qc_from_doit],
            'targets': [os.path.join(self.args.output_dir, 'samples_passing_qc.txt')],
            'file_dep': [
                os.path.join(self.args.output_dir, 'qc_parameters.cfg'),
                os.path.join(self.args.output_dir, 'merged_stats.txt')], }

    def task_filter_segregation(self):
        """Task to exclude NPs failing QC from segregation tables"""

        passqc_file = os.path.join(
            self.args.output_dir,
            'samples_passing_qc.txt')

        for window_size in self.args.window_sizes:

            segregation_file = segregation_path(
                self.args.output_dir, window_size)

            yield {
                'basename': 'Filtering samples based on QC values',
                'name': pretty_resolution(window_size),
                'targets': [swap_extension(segregation_file, '.passed_qc.table')],
                'file_dep': [passqc_file, segregation_file],
                'actions': [select_samples.select_samples_from_doit]
            }

    def task_do_qc(self):
        """Empty target to force all QC tasks to run"""

        return {'basename': 'do_qc',
                'actions': None,
                'task_dep': ['Filtering samples based on QC values']}

    def task_linkage_matrices(self):
        """Task to calculate linkage matrices at specified resolutions"""

        # TODO: Allow specification of which chromosomes to generate matrices
        # for
        chroms = ['chr{0}'.format(c) for c in range(1, 20)]

        for window_size in self.args.matrix_sizes:

            for chrom in chroms:

                segregation_file = segregation_path(
                    self.args.output_dir, window_size)
                if 'do_qc' in self.args.to_run:
                    segregation_file = swap_extension(
                        segregation_file, '.passed_qc.table')

                matrix_base = '{seg_file}.dprime.{chrom}.txt.gz'.format(
                    seg_file=os.path.basename(segregation_file),
                    chrom=chrom)
                matrix_out = os.path.join(
                    self.args.output_dir, 'dprime_matrices', '{}'.format(
                        pretty_resolution(window_size)), matrix_base)

                yield {'basename': 'Calculating linkage matrix',
                       'name': '{0} at {1}'.format(chrom, pretty_resolution(window_size)),
                       'targets': [matrix_out],
                       'file_dep': [segregation_file],
                       'actions': ['mkdir -p $(dirname %(targets)s)',
                                   (cosegregation.matrix_from_doit,
                                    (matrix_out, segregation_file, [chrom]))], }

def check_resolution_consistency(args):
    """
    Ensure that matrix_sizes and qc_window_size are subsets of window_sizes.
    """

    all_window_sizes = list(args.window_sizes)
    all_window_sizes.extend(args.matrix_sizes)
    if args.qc_window_size is not None:
        all_window_sizes.append(args.qc_window_size)

    args.window_sizes = sorted(list(set(all_window_sizes)))

def process_nps_from_args(args):
    """Wrapper function to call doit from argparse"""

    check_resolution_consistency(args)

    if args.matrix_sizes:
        args.to_run.append('Calculating linkage matrix')

    task_dict = {
        'input_file_tasks': InputFileMappingTasks(args)
    }
    run(task_dict, args, args.to_run)