NationalGenomicsInfrastructure/ngi_pipeline

View on GitHub
ngi_pipeline/engines/piper_ngi/launchers.py

Summary

Maintainability
F
3 days
Test Coverage
"""The Piper automated launcher script."""
from __future__ import print_function

import glob
import os
import re
import shlex
import shutil
import subprocess
import time
import datetime


from ngi_pipeline.engines.utils import handle_sample_status, handle_libprep_status, handle_seqrun_status
from ngi_pipeline.utils.communication import mail_analysis
from ngi_pipeline.conductor.classes import NGIProject, NGIAnalysis
from ngi_pipeline.database.classes import CharonSession, CharonError
from ngi_pipeline.engines.piper_ngi import workflows
from ngi_pipeline.engines.piper_ngi.command_creation_config import build_piper_cl, \
                                                                   build_setup_xml
from ngi_pipeline.engines.piper_ngi.local_process_tracking import is_sample_analysis_running_local, \
                                                                  kill_running_sample_analysis, \
                                                                  record_process_sample
from ngi_pipeline.engines.piper_ngi.utils import check_for_preexisting_sample_runs, \
                                                 create_exit_code_file_path, \
                                                 create_log_file_path, \
                                                 create_sbatch_header, \
                                                 find_previous_genotype_analyses, \
                                                 find_previous_sample_analyses, \
                                                 get_valid_seqruns_for_sample, \
                                                 launch_piper_job, \
                                                 record_analysis_details, \
                                                 remove_previous_genotype_analyses, \
                                                 remove_previous_sample_analyses, \
                                                 rotate_previous_analysis

from ngi_pipeline.log.loggers import log_process_non_blocking, minimal_logger
from ngi_pipeline.utils.filesystem import load_modules, execute_command_line, \
                                          rotate_file, safe_makedir, \
                                          match_files_under_dir
from ngi_pipeline.utils.classes import with_ngi_config
from ngi_pipeline.utils.filesystem import fastq_files_under_dir, is_index_file
from ngi_pipeline.utils.slurm import get_slurm_job_status

LOG = minimal_logger(__name__)

@with_ngi_config
def analyze(analysis_object, level='sample', config=None, config_file_path=None):
    """Analyze data at the sample level.

    :param NGIAnalysis analysis_object: holds all the parameters for the analysis

    :raises ValueError: If exec_mode is an unsupported value
    """
    charon_session = CharonSession()
    for sample in analysis_object.project:
        try:
            charon_reported_status = charon_session.sample_get(analysis_object.project.project_id,
                                                               sample).get('analysis_status')
            # Check Charon to ensure this hasn't already been processed
            do_analyze=handle_sample_status(analysis_object, sample, charon_reported_status)
            if not do_analyze :
                continue
        except CharonError as e:
            LOG.error(e)
            continue
        if level == "sample":
            status_field = "alignment_status"
        elif level == "genotype":
            status_field = "genotype_status"
        else:
            LOG.warning('Unknown workflow level: "{}"'.format(level))
            status_field = "alignment_status" # Or should we abort?
        try:
            check_for_preexisting_sample_runs(analysis_object.project, sample, analysis_object.restart_running_jobs,
                                              analysis_object.restart_finished_jobs, status_field)
        except RuntimeError as e:
            raise RuntimeError('Aborting processing of project/sample "{}/{}": '
                               '{}'.format(analysis_object.project, sample, e))
        if analysis_object.exec_mode.lower() not in ("sbatch", "local"):
            raise ValueError('"exec_mode" param must be one of "sbatch" or "local" '
                             'value was "{}"'.format(analysis_object.exec_mode))
        if analysis_object.exec_mode == "local":
            modules_to_load = analysis_object.config.get("piper", {}).get("load_modules", [])
            load_modules(modules_to_load)
        for workflow_subtask in workflows.get_subtasks_for_level(level=level):
            if level == "genotype":
                genotype_status = None # Some records in Charon lack this field, I'm guessing
                try:
                    charon_session = CharonSession()
                    genotype_status = charon_session.sample_get(projectid=analysis_object.project.project_id,
                                                                sampleid=sample.name).get("genotype_status")
                except CharonError as e:
                    LOG.error('Couldn\'t determine genotyping status for project/'
                              'sample "{}/{}"; skipping analysis.'.format(analysis_object.project, sample))
                    continue
                if find_previous_genotype_analyses(analysis_object.project, sample) or genotype_status == "DONE":
                    if not analysis_object.restart_finished_jobs:
                        LOG.info('Project/sample "{}/{}" has completed genotype '
                                 'analysis previously; skipping (use flag to force '
                                 'analysis)'.format(analysis_object.project, sample))
                        continue
            if analysis_object.restart_running_jobs:
                # Kill currently-running jobs if they exist
                kill_running_sample_analysis(workflow_subtask=workflow_subtask,
                                             project_id=analysis_object.project.project_id,
                                             sample_id=sample.name)
            # This checks the local jobs database
            if not is_sample_analysis_running_local(workflow_subtask=workflow_subtask,
                                                    project_id=analysis_object.project.project_id,
                                                    sample_id=sample.name):
                LOG.info('Launching "{}" analysis for sample "{}" in project '
                         '"{}"'.format(workflow_subtask, sample, analysis_object.project))
                try:
                    log_file_path = create_log_file_path(workflow_subtask=workflow_subtask,
                                                         project_base_path=analysis_object.project.base_path,
                                                         project_name=analysis_object.project.dirname,
                                                         project_id=analysis_object.project.project_id,
                                                         sample_id=sample.name)
                    rotate_file(log_file_path)
                    exit_code_path = create_exit_code_file_path(workflow_subtask=workflow_subtask,
                                                                project_base_path=analysis_object.project.base_path,
                                                                project_name=analysis_object.project.dirname,
                                                                project_id=analysis_object.project.project_id,
                                                                sample_id=sample.name)
                    if level == "sample":
                        remove_previous_sample_analyses(analysis_object.project, sample)
                        default_files_to_copy=None
                    elif level == "genotype":
                        remove_previous_genotype_analyses(analysis_object.project)
                        default_files_to_copy=None

                    # Update the project to keep only valid fastq files for setup.xml creation
                    if level == "genotype":
                        updated_project, default_files_to_copy = \
                                collect_files_for_sample_analysis(analysis_object.project,
                                                                  sample,
                                                                  restart_finished_jobs=True,
                                                                  status_field="genotype_status")
                    else:
                        updated_project, default_files_to_copy = \
                                collect_files_for_sample_analysis(analysis_object.project,
                                                                  sample,
                                                                  analysis_object.restart_finished_jobs,
                                                                  status_field="alignment_status")
                    setup_xml_cl, setup_xml_path = build_setup_xml(project=updated_project,
                                                                   sample=sample,
                                                                   workflow=workflow_subtask,
                                                                   local_scratch_mode=(analysis_object.exec_mode == "sbatch"),
                                                                   config=analysis_object.config)
                    piper_cl = build_piper_cl(project=analysis_object.project,
                                              workflow_name=workflow_subtask,
                                              setup_xml_path=setup_xml_path,
                                              exit_code_path=exit_code_path,
                                              config=analysis_object.config,
                                              exec_mode=analysis_object.exec_mode)
                    if analysis_object.exec_mode == "sbatch":
                        process_id = None
                        slurm_job_id = sbatch_piper_sample([setup_xml_cl, piper_cl],
                                                           workflow_subtask,
                                                           analysis_object.project, sample,
                                                           restart_finished_jobs=analysis_object.restart_finished_jobs,
                                                           files_to_copy=default_files_to_copy)
                        for x in range(10):
                            # Time delay to let sbatch get its act together
                            # (takes a few seconds to be visible with sacct)
                            try:
                                get_slurm_job_status(slurm_job_id)
                                break
                            except ValueError:
                                time.sleep(2)
                        else:
                            LOG.error('sbatch file for sample {}/{} did not '
                                      'queue properly! Job ID {} cannot be '
                                      'found.'.format(analysis_object.project, sample, slurm_job_id))
                    else: # "local"
                        raise NotImplementedError('Local execution not currently implemented. '
                                                  'I\'m sure Denis can help you with this.')
                        #slurm_job_id = None
                        #launch_piper_job(setup_xml_cl, project)
                        #process_handle = launch_piper_job(piper_cl, project)
                        #process_id = process_handle.pid
                    try:
                        record_process_sample(project=analysis_object.project,
                                              sample=sample,
                                              analysis_module_name="piper_ngi",
                                              slurm_job_id=slurm_job_id,
                                              process_id=process_id,
                                              workflow_subtask=workflow_subtask)
                    except RuntimeError as e:
                        LOG.error(e)
                        ## Question: should we just kill the run in this case or let it go?
                        continue
                except (NotImplementedError, RuntimeError, ValueError) as e:
                    error_msg = ('Processing project "{}" / sample "{}" / workflow "{}" '
                                 'failed: {}'.format(analysis_object.project, sample,
                                                     workflow_subtask,
                                                     e))
                    LOG.error(error_msg)


def collect_files_for_sample_analysis(project_obj, sample_obj, 
                                      restart_finished_jobs=False,
                                      status_field="alignment_status"):
    """This function finds all data files relating to a sample and
    follows a preset decision path to decide which of them to include in
    a sample-level analysis. This can include fastq files, bam files, and
    alignment-qc-level files.
    Doesn't modify existing project or sample objects; returns new copies.

    :param NGIProject project_obj: The NGIProject object to process
    :param NGISample sample_obj: The NGISample object to process
    :param bool restart_finished_jobs: Include jobs marked as "DONE" (default False)
    :param str status_field: Which Charon status field to check (alignment, genotype)

    :returns: A new NGIProject object, a list of alignment and qc files
    :rtype: NGIProject, list, list

    :raises ValueError: If there are no valid libpreps, seqruns, or fastq files
    """
    ### FASTQ
    # Access the filesystem to determine what fastq files are available
    # For each file, validate it.

    # This funtion goes into Charon and finds all valid libpreps and seqruns,
    # dvs libpreps for which               'qc' != "FAILED"
    # and seqruns  for which 'alignment_status' != "DONE"
    valid_libprep_seqruns = \
            get_valid_seqruns_for_sample(project_id=project_obj.project_id,
                                         sample_id=sample_obj.name,
                                         include_failed_libpreps=False,
                                         include_done_seqruns=restart_finished_jobs,
                                         status_field=status_field)
    if not valid_libprep_seqruns:
        raise ValueError('No valid libpreps/seqruns found for project/sample '
                         '"{}/{}"'.format(project_obj, sample_obj))

    # Now we find all fastq files that are available and validate them against
    # the group compiled in the previous step (get_valid_seqruns_for_sample)
    # We're going to recreate NGIProject/NGISample/NGILibraryPrep/NGISeqrun objects here
    sample_data_directory = os.path.join(project_obj.base_path, "DATA",
                                         project_obj.dirname, sample_obj.dirname)
    fastq_files_on_filesystem = fastq_files_under_dir(sample_data_directory, realpath=False)
    if not fastq_files_on_filesystem:
        raise ValueError('No valid fastq files found for project/sample '
                         '{}/{}'.format(project_obj, sample_obj))

    # Create a new NGIProject object (the old one could still be in use elsewhere)
    proj_obj = NGIProject(project_obj.name, project_obj.dirname,
                          project_obj.project_id, project_obj.base_path)
    sample_obj = proj_obj.add_sample(sample_obj.name, sample_obj.dirname)
    # filter out index files from the analysis
    for fastq_path in [f for f in fastq_files_on_filesystem if not is_index_file(f)]:
        base_path, fastq = os.path.split(fastq_path)
        if not fastq:
            base_path, fastq = os.path.split(base_path) # Handles trailing slash
        base_path, fs_seqrun_name = os.path.split(base_path)
        base_path, fs_libprep_name = os.path.split(base_path)
        if fs_libprep_name not in valid_libprep_seqruns.keys():
            # Invalid library prep, skip this fastq file
            continue
        elif fs_seqrun_name not in valid_libprep_seqruns.get(fs_libprep_name, []):
            continue
        else:
            libprep_obj = sample_obj.add_libprep(name=fs_libprep_name, dirname=fs_libprep_name)
            seqrun_obj = libprep_obj.add_seqrun(name=fs_seqrun_name, dirname=fs_seqrun_name)
            seqrun_obj.add_fastq_files(fastq)

    ### EXISTING DATA
    # If we still have data here at this point, we'll copy it over. If we had
    # decided to scrap it, it would have been deleted already.
    files_to_copy = find_previous_sample_analyses(proj_obj, sample_obj)

    return (proj_obj, files_to_copy)

@with_ngi_config
def sbatch_piper_sample(command_line_list, workflow_name, project, sample,
                        libprep=None, restart_finished_jobs=False, files_to_copy=None,
                        config=None, config_file_path=None):
    """sbatch a piper sample-level workflow.

    :param list command_line_list: The list of command lines to execute (in order)
    :param str workflow_name: The name of the workflow to execute
    :param NGIProject project: The NGIProject
    :param NGISample sample: The NGISample
    :param dict config: The parsed configuration file (optional)
    :param str config_file_path: The path to the configuration file (optional)
    """
    job_identifier = "{}-{}-{}".format(project.project_id, sample, workflow_name)
    # Paths to the various data directories
    project_dirname = project.dirname
    perm_analysis_dir = os.path.join(project.base_path, "ANALYSIS", project_dirname, "piper_ngi", "")
    scratch_analysis_dir = os.path.join("$SNIC_TMP/ANALYSIS/", project_dirname, "piper_ngi", "")
    #ensure that the analysis dir exists
    safe_makedir(perm_analysis_dir)
    try:
        slurm_project_id = config["environment"]["project_id"]
    except KeyError:
        raise RuntimeError('No SLURM project id specified in configuration file '
                           'for job "{}"'.format(job_identifier))
    slurm_queue = config.get("slurm", {}).get("queue") or "core"
    num_cores = config.get("slurm", {}).get("cores") or 16
    slurm_time = config.get("piper", {}).get("job_walltime", {}).get(workflow_name) or "4-00:00:00"
    slurm_out_log = os.path.join(perm_analysis_dir, "logs", "{}_sbatch.out".format(job_identifier))
    slurm_err_log = os.path.join(perm_analysis_dir, "logs", "{}_sbatch.err".format(job_identifier))
    for log_file in slurm_out_log, slurm_err_log:
        rotate_file(log_file)
    sbatch_text = create_sbatch_header(slurm_project_id=slurm_project_id,
                                       slurm_queue=slurm_queue,
                                       num_cores=num_cores,
                                       slurm_time=slurm_time,
                                       job_name="piper_{}".format(job_identifier),
                                       slurm_out_log=slurm_out_log,
                                       slurm_err_log=slurm_err_log)
    sbatch_text_list = sbatch_text.split("\n")
    sbatch_extra_params = config.get("slurm", {}).get("extra_params", {})
    for param, value in sbatch_extra_params.items():
        sbatch_text_list.append("#SBATCH {} {}\n\n".format(param, value))
    modules_to_load = config.get("piper", {}).get("load_modules", [])
    if modules_to_load:
        sbatch_text_list.append("\n# Load required modules for Piper")
        for module_name in modules_to_load:
            sbatch_text_list.append("module load {}".format(module_name))

    if not files_to_copy:
        project, files_to_copy = \
            collect_files_for_sample_analysis(project, sample, restart_finished_jobs)

    # Fastq files to copy
    fastq_src_dst_list = []
    directories_to_create = set()
    for libprep in sample:
        for seqrun in libprep:
            project_specific_path = os.path.join(project.dirname,
                                                     sample.dirname,
                                                     libprep.dirname,
                                                     seqrun.dirname)
            directories_to_create.add(os.path.join("$SNIC_TMP/DATA/",
                                                       project_specific_path))
            for fastq in seqrun.fastq_files:
                src_file = os.path.join(project.base_path, "DATA",
                                            project_specific_path, fastq)
                dst_file = os.path.join("$SNIC_TMP/DATA/",
                                            project_specific_path,
                                            fastq)
                fastq_src_dst_list.append([src_file, dst_file])

    sbatch_text_list.append("echo -ne '\\n\\nCopying fastq files at '")
    sbatch_text_list.append("date")
    if fastq_src_dst_list:
        for directory in directories_to_create:
            sbatch_text_list.append("mkdir -p {}".format(directory))
        for src_file, dst_file in fastq_src_dst_list:
            sbatch_text_list.append("rsync -rptoDLv {} {}".format(src_file, dst_file))
    else:
        raise ValueError(('No valid fastq files available to process for '
                          'project/sample {}/{}'.format(project, sample)))

    # Pre-existing analysis files
    if files_to_copy:
        sbatch_text_list.append("echo -ne '\\n\\nCopying pre-existing analysis files at '")
        sbatch_text_list.append("date")

        sbatch_text_list.append("if [ ! -d {output directory} ]; then")
        sbatch_text_list.append("mkdir {output directory} ")
        sbatch_text_list.append("fi")
        sbatch_text_list.append(("rsync -rptoDLv {input_files} "
                                 "{output_directory}/").format(input_files=" ".join(files_to_copy),
                                                               output_directory=scratch_analysis_dir))
        # Delete pre-existing analysis files after copy
        sbatch_text_list.append("echo -ne '\\n\\nDeleting pre-existing analysis files at '")
        sbatch_text_list.append("date")
        sbatch_text_list.append("rm -rf {input_files}".format(input_files=" ".join(files_to_copy)))

    sbatch_text_list.append("echo -ne '\\n\\nExecuting command lines at '")
    sbatch_text_list.append("date")
    sbatch_text_list.append("# Run the actual commands")
    for command_line in command_line_list:
        sbatch_text_list.append(command_line)


    piper_status_file = create_exit_code_file_path(workflow_subtask=workflow_name,
                                                   project_base_path=project.base_path,
                                                   project_name=project.dirname,
                                                   project_id=project.project_id,
                                                   sample_id=sample.name)
    sbatch_text_list.append("\nPIPER_RETURN_CODE=$?")

    #Precalcuate md5sums
    sbatch_text_list.append('MD5FILES="$SNIC_TMP/ANALYSIS/{}/piper_ngi/05_processed_alignments/*{}*.bam'.format(project.project_id,sample.name))
    sbatch_text_list.append('$SNIC_TMP/ANALYSIS/{}/piper_ngi/05_processed_alignments/*.table'.format(project.project_id))
    sbatch_text_list.append('$SNIC_TMP/ANALYSIS/{}/piper_ngi/07_variant_calls/*{}*.genomic.vcf.gz'.format(project.project_id,sample.name))
    sbatch_text_list.append('$SNIC_TMP/ANALYSIS/{}/piper_ngi/07_variant_calls/*{}*.annotated.vcf.gz"'.format(project.project_id,sample.name))
    sbatch_text_list.append('for f in $MD5FILES')
    sbatch_text_list.append('do')
    sbatch_text_list.append("    md5sum $f | awk '{printf $1}' > $f.md5 &")
    sbatch_text_list.append('done')
    sbatch_text_list.append('wait')
    
    #Copying back files
    sbatch_text_list.append("echo -ne '\\n\\nCopying back the resulting analysis files at '")
    sbatch_text_list.append("date")
    sbatch_text_list.append("mkdir -p {}".format(perm_analysis_dir))
    sbatch_text_list.append("rsync -rptoDLv {}/ {}/".format(scratch_analysis_dir, perm_analysis_dir))
    sbatch_text_list.append("\nRSYNC_RETURN_CODE=$?")

    # Record job completion status
    sbatch_text_list.append("if [[ $RSYNC_RETURN_CODE == 0 ]]")
    sbatch_text_list.append("then")
    sbatch_text_list.append("  if [[ $PIPER_RETURN_CODE == 0 ]]")
    sbatch_text_list.append("  then")
    sbatch_text_list.append("    echo '0'> {}".format(piper_status_file))
    sbatch_text_list.append("  else")
    sbatch_text_list.append("    echo '1'> {}".format(piper_status_file))
    sbatch_text_list.append("  fi")
    sbatch_text_list.append("else")
    sbatch_text_list.append("  echo '2'> {}".format(piper_status_file))
    sbatch_text_list.append("fi")

    # Write the sbatch file
    sbatch_dir = os.path.join(perm_analysis_dir, "sbatch")
    safe_makedir(sbatch_dir)
    sbatch_outfile = os.path.join(sbatch_dir, "{}.sbatch".format(job_identifier))
    rotate_file(sbatch_outfile)
    with open(sbatch_outfile, 'w') as f:
        f.write("\n".join(sbatch_text_list))
    LOG.info("Queueing sbatch file {} for job {}".format(sbatch_outfile, job_identifier))
    # Queue the sbatch file
    p_handle = execute_command_line("sbatch {}".format(sbatch_outfile),
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE)
    p_out, p_err = p_handle.communicate()
    try:
        slurm_job_id = re.match(r'Submitted batch job (\d+)', p_out).groups()[0]
    except AttributeError:
        raise RuntimeError('Could not submit sbatch job for workflow "{}": '
                           '{}'.format(job_identifier, p_err))
    # Detail which seqruns we've started analyzing so we can update statuses later
    record_analysis_details(project, job_identifier)
    return int(slurm_job_id)