NationalGenomicsInfrastructure/ngi_pipeline

View on GitHub
ngi_pipeline/conductor/launchers.py

Summary

Maintainability
B
5 hrs
Test Coverage
#!/usr/bin/env python

from __future__ import print_function

import importlib

from ngi_pipeline.conductor.classes import NGIProject, NGIAnalysis, get_engine_for_bp, load_engine_module
from ngi_pipeline.database.classes import CharonSession, CharonError
from ngi_pipeline.log.loggers import minimal_logger
from ngi_pipeline.utils.classes import with_ngi_config
from ngi_pipeline.utils.communication import mail_analysis


LOG = minimal_logger(__name__)

@with_ngi_config
def launch_analysis(projects_to_analyze, restart_failed_jobs=False,
                    restart_finished_jobs=False, restart_running_jobs=False,
                    no_qc=False, exec_mode="sbatch",
                    quiet=False, manual=False,
                    config=None, config_file_path=None):
    """Launch the appropriate analysis for each fastq file in the project.

    :param list projects_to_analyze: The list of projects (Project objects) to analyze
    :param dict config: The parsed NGI configuration file; optional/has default.
    :param str config_file_path: The path to the NGI configuration file; optional/has default.
    """
    charon_session = CharonSession()
    for project in projects_to_analyze:
        try:
            analysis=NGIAnalysis(project=project, restart_failed_jobs=restart_failed_jobs,
                        restart_finished_jobs=restart_finished_jobs,
                        restart_running_jobs=restart_running_jobs,
                        no_qc=no_qc,
                        exec_mode=exec_mode, quiet=quiet, manual=manual,
                        config=config, config_file_path=config_file_path,
                        log=LOG)
        except (RuntimeError, CharonError) as e: # BPA missing from Charon?
            LOG.error('Skipping project "{}" because of error: {}'.format(project, e))
            continue
        #update charon with the current analysis status
        analysis.engine.local_process_tracking.update_charon_with_local_jobs_status(config=config)
        try:
            project_status = charon_session.project_get(project.project_id)['status']
        except CharonError as e:
            LOG.error('Project {} could not be processed: {}'.format(project, e))
            continue
        if not project_status == "OPEN":
            error_text = ('Data found on filesystem for project "{}" but Charon '
                          'reports its status is not OPEN ("{}"). Not launching '
                          'analysis for this project.'.format(project, project_status))
            LOG.error(error_text)
            if not config.get('quiet'):
                mail_analysis(project_name=project.name, level="ERROR", info_text=error_text)
            continue
        if not no_qc:
            try:
                qc_analysis_module = load_engine_module("qc", config)
            except RuntimeError as e:
                LOG.error("Could not launch qc analysis: {}".format(e))
        for sample in project:
            # Launch QC analysis
            if not no_qc:
                try:
                    LOG.info('Attempting to launch sample QC analysis '
                             'for project "{}" / sample "{}" / engine '
                             '"{}"'.format(project, sample, qc_analysis_module.__name__))
                    qc_analysis_module.analyze(project=project,
                                               sample=sample,
                                               config=config)
                except Exception as e:
                    error_text = ('Cannot process project "{}" / sample "{}" / '
                                  'engine "{}" : {}'.format(project, sample,
                                                            qc_analysis_module.__name__,
                                                            e))
                    LOG.error(error_text)
                    if not config.get("quiet"):
                        mail_analysis(project_name=project.name, sample_name=sample.name,
                                      engine_name=qc_analysis_module.__name__,
                                      level="ERROR", info_text=e)
            # Launch actual best-practice analysis
        analysis.engine.analyze(analysis)