NationalGenomicsInfrastructure/ngi_pipeline

View on GitHub
ngi_pipeline/conductor/flowcell.py

Summary

Maintainability
F
4 days
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-


from __future__ import print_function

import glob
import os
import re
import sys

from ngi_pipeline.conductor.classes import NGIProject
from ngi_pipeline.database.classes import CharonSession, CharonError
from ngi_pipeline.database.communicate import get_project_id_from_name
from ngi_pipeline.log.loggers import minimal_logger
from ngi_pipeline.utils.classes import with_ngi_config
from ngi_pipeline.utils.communication import mail_analysis
from ngi_pipeline.utils.filesystem import do_symlink, locate_flowcell, safe_makedir
from ngi_pipeline.utils.parsers import determine_library_prep_from_fcid, \
                                       get_sample_numbers_from_samplesheet
from six.moves import filter

LOG = minimal_logger(__name__)

UPPSALA_PROJECT_RE = re.compile(r'(\w{2}-\d{4}|\w{2}-?\d{2,3})')
STHLM_PROJECT_RE = re.compile(r'[A-z]+[_.][A-z0-9]+_\d{2}_\d{2}')
STHLM_X_PROJECT_RE = re.compile(r'[A-z]+_[A-z0-9]+_\d{2}_\d{2}')

@with_ngi_config
def organize_projects_from_flowcell(demux_fcid_dirs, restrict_to_projects=None,
                                    restrict_to_samples=None,
                                    fallback_libprep=None, quiet=False,
                                    create_files=True,
                                    config=None, config_file_path=None):
    """Sort demultiplexed Illumina flowcells into projects and return a list of them,
    creating the project/sample/libprep/seqrun dir tree on disk via symlinks.

    :param list demux_fcid_dirs: The CASAVA-produced demux directory/directories.
    :param list restrict_to_projects: A list of projects; analysis will be
                                      restricted to these. Optional.
    :param list restrict_to_samples: A list of samples; analysis will be
                                     restricted to these. Optional.
    :param str fallback_libprep: If libprep cannot be determined, use this value if supplied (default None)
    :param bool quiet: Don't send notification emails
    :param bool create_files: Alter the filesystem (as opposed to just parsing flowcells) (default True)
    :param dict config: The parsed NGI configuration file; optional.
    :param str config_file_path: The path to the NGI configuration file; optional.

    :returns: A list of NGIProject objects.
    :rtype: list
    :raises RuntimeError: If no (valid) projects are found in the flowcell dirs
    """
    if not restrict_to_projects: restrict_to_projects = []
    if not restrict_to_samples: restrict_to_samples = []
    demux_fcid_dirs_set = set(demux_fcid_dirs)
    # Sort/copy each raw demux FC into project/sample/fcid format -- "analysis-ready"
    projects_to_analyze = dict()
    for demux_fcid_dir in demux_fcid_dirs_set:
        try:
            # Get the full path to the flowcell if it was passed in as just a name
            demux_fcid_dir = locate_flowcell(demux_fcid_dir)
        except ValueError as e:
            # Flowcell path couldn't be found/doesn't exist; skip it
            LOG.error('Skipping flowcell "{}": {}'.format(demux_fcid_dir, e))
            continue
        # These will be a bunch of Project objects each containing Samples, FCIDs, lists of fastq files
        projects_to_analyze = \
                setup_analysis_directory_structure(fc_dir=demux_fcid_dir,
                                                   projects_to_analyze=projects_to_analyze,
                                                   restrict_to_projects=restrict_to_projects,
                                                   restrict_to_samples=restrict_to_samples,
                                                   create_files=create_files,
                                                   fallback_libprep=fallback_libprep,
                                                   config=config,
                                                   quiet=quiet)
    if not projects_to_analyze:
        if restrict_to_projects:
            error_message = ("No projects found to process: the specified flowcells "
                             "({fcid_dirs}) do not contain the specified project(s) "
                             "({restrict_to_projects}) or there was an error "
                             "gathering required information.").format(
                                    fcid_dirs=",".join(demux_fcid_dirs_set),
                                    restrict_to_projects=",".join(restrict_to_projects))
        else:
            error_message = ("No projects found to process in flowcells {} "
                             "or there was an error gathering required "
                             "information.".format(",".join(demux_fcid_dirs_set)))
        raise RuntimeError(error_message)
    else:
        projects_to_analyze = list(projects_to_analyze.values())
    return projects_to_analyze


def match_fastq_sample_number_to_samplesheet(fastq_file, samplesheet_sample_numbers, project_id=None):
    """
    Given the sample numbers deduced from the samplesheet and a fastq file name, will return the corresponsing
    samplesheet entry, taking sample name, lane number and sample number into account. If project_id is specified,
    this will also be taken into account.

    :param fastq_file: fastq file name
    :param samplesheet_sample_numbers: list of samplesheet entries with deduced sample numbers
    :param project_id: project id. If not specified, this field will not be compared
    :return: the samplesheet entry corresponding to the fastq file as a list, following the format returned by
    ngi_pipeline.utils.parsers.get_sample_numbers_from_samplesheet
    """
    def _is_a_match(ss_sample, prjid, sname, lnum, snum):
        return all([
            ss_sample[0] == snum,
            ss_sample[5] == int(lnum),
            ss_sample[2] == sname,
            (prjid is None or ss_sample[1] == prjid)])

    try:
        samnple_name, sample_num, lane_num = re.match(
            r'([\w-]+)_(S\d+)_L(\d{3})_\w+',
            os.path.basename(fastq_file)).groups()
        return list(filter(
            lambda s: _is_a_match(s, project_id, samnple_name, lane_num, sample_num),
            samplesheet_sample_numbers))[0]
    except AttributeError:
        # the sample and lane numbers could not be identified in fastq file name
        pass
    except IndexError:
        # the sample number and lane number could not be matched to any samplesheet_sample_number
        pass
    except TypeError:
        # the list of samplesheet entries was None
        pass
    return None


@with_ngi_config
def setup_analysis_directory_structure(fc_dir, projects_to_analyze,
                                       restrict_to_projects=None, restrict_to_samples=None,
                                       create_files=True,
                                       fallback_libprep=None,
                                       quiet=False,
                                       config=None, config_file_path=None):
    """
    Copy and sort files from their CASAVA-demultiplexed flowcell structure
    into their respective project/sample/libPrep/FCIDs. This collects samples
    split across multiple flowcells.

    :param str fc_dir: The directory created by CASAVA for this flowcell.
    :param dict config: The parsed configuration file.
    :param set projects_to_analyze: A dict (of Project objects, or empty)
    :param bool create_files: Alter the filesystem (as opposed to just parsing flowcells) (default True)
    :param str fallback_libprep: If libprep cannot be determined, use this value if supplied (default None)
    :param list restrict_to_projects: Specific projects within the flowcell to process exclusively
    :param list restrict_to_samples: Specific samples within the flowcell to process exclusively

    :returns: A list of NGIProject objects that need to be run through the analysis pipeline
    :rtype: list

    :raises KeyError: If a required configuration key is not available.
    """
    LOG.info("Setting up analysis for demultiplexed data in source folder \"{}\"".format(fc_dir))
    if not restrict_to_projects: restrict_to_projects = []
    if not restrict_to_samples: restrict_to_samples = []
    config["quiet"] = quiet # Hack because I enter here from a script sometimes
    #Checks flowcell path to establish which group owns it
    pattern=".+({}|{})\/.+".format(config["analysis"]["sthlm_root"], config["analysis"]["upps_root"])
    matches=re.match(pattern, fc_dir)
    if matches:
        flowcell_uppnexid=matches.group(1)
    else:
        LOG.error("cannot guess which project (sthlm/uppsala) the flowcell {} belongs to".format(fc_dir))
        raise RuntimeError

    analysis_top_dir = os.path.abspath(os.path.join(config["analysis"]["base_root"],flowcell_uppnexid,config["analysis"]["top_dir"]))
    try:
        safe_makedir(analysis_top_dir)
    except OSError as e:
        LOG.error('Error: Analysis top directory {} does not exist and could not '
                  'be created.'.format(analysis_top_dir))
    fc_dir = fc_dir if os.path.isabs(fc_dir) else os.path.join(analysis_top_dir, fc_dir)
    if not os.path.exists(fc_dir):
        LOG.error("Error: Flowcell directory {} does not exist".format(fc_dir))
        return []
    # Map the directory structure for this flowcell
    try:
        fc_dir_structure = parse_flowcell(fc_dir)
    except (OSError, ValueError) as e:
        LOG.error("Error when processing flowcell dir \"{}\": {}".format(fc_dir, e))
        return []
    fc_full_id = fc_dir_structure['fc_full_id']
    if not fc_dir_structure.get('projects'):
        LOG.warning("No projects found in specified flowcell directory \"{}\"".format(fc_dir))

    # Iterate over the projects in the flowcell directory
    for project in fc_dir_structure.get('projects', []):
        project_name = project['project_name']
        project_original_name = project['project_original_name']
        samplesheet_path = fc_dir_structure.get("samplesheet_path")

        # parse the samplesheet and get the expected sample numbers assigned by bcl2fastq
        samplesheet_sample_numbers = get_sample_numbers_from_samplesheet(
            samplesheet_path) if samplesheet_path else None

        try:
            # Maps e.g. "Y.Mom_14_01" to "P123"
            project_id = get_project_id_from_name(project_name)
        except (CharonError, RuntimeError, ValueError) as e:
            LOG.warning('Could not retrieve project id from Charon (record missing?). '
                     'Using project name ("{}") as project id '
                     '(error: {})'.format(project_name, e))
            project_id = project_name
        # If specific projects are specified, skip those that do not match
        if restrict_to_projects and project_name not in restrict_to_projects and \
                                    project_id not in restrict_to_projects:
            LOG.debug("Skipping project {} (not in restrict_to_projects)".format(project_name))
            continue
        LOG.info("Setting up project {}".format(project.get("project_name")))
        # Create a project directory if it doesn't already exist, including
        # intervening "DATA" directory
        project_dir = os.path.join(analysis_top_dir, "DATA", project_id)
        project_sl_dir = os.path.join(analysis_top_dir, "DATA", project_name)
        project_analysis_dir = os.path.join(analysis_top_dir, "ANALYSIS", project_id)
        project_analysis_sl_dir = os.path.join(analysis_top_dir, "ANALYSIS", project_name)
        if create_files:
            safe_makedir(project_dir, 0o2770)
            safe_makedir(project_analysis_dir, 0o2770)
            if not project_dir == project_sl_dir and \
               not os.path.exists(project_sl_dir):
                os.symlink(project_dir, project_sl_dir)
            if not project_analysis_dir == project_analysis_sl_dir and \
               not os.path.exists(project_analysis_sl_dir):
                os.symlink(project_analysis_dir, project_analysis_sl_dir)
        try:
            project_obj = projects_to_analyze[project_dir]
        except KeyError:
            project_obj = NGIProject(name=project_name, dirname=project_id,
                                     project_id=project_id,
                                     base_path=analysis_top_dir)
            projects_to_analyze[project_dir] = project_obj
        # Iterate over the samples in the project
        for sample in project.get('samples', []):
            sample_name = sample['sample_name']
            # If specific samples are specified, skip those that do not match
            if restrict_to_samples and sample_name not in restrict_to_samples:
                LOG.debug("Skipping sample {}: not in specified samples "
                          "{}".format(sample_name, ", ".join(restrict_to_samples)))
                continue
            LOG.info("Setting up sample {}".format(sample_name))
            # Create a directory for the sample if it doesn't already exist
            sample_dir = os.path.join(project_dir, sample_name)
            if create_files: safe_makedir(sample_dir, 0o2770)
            # This will only create a new sample object if it doesn't already exist in the project
            sample_obj = project_obj.add_sample(name=sample_name, dirname=sample_name)
            # Get the Library Prep ID for each file
            pattern = re.compile(".*\.(fastq|fq)(\.gz|\.gzip|\.bz2)?$")
            fastq_files = list(filter(pattern.match, sample.get('files', [])))
            # For each fastq file, create the libprep and seqrun objects
            # and add the fastq file to the seqprep object
            # Note again that these objects only get created if they don't yet exist;
            # if they do exist, the existing object is returned
            for fq_file in fastq_files:
                # Try to use assignment from SampleSheet
                samplesheet_sample = match_fastq_sample_number_to_samplesheet(
                    fq_file,
                    samplesheet_sample_numbers,
                    project_id)
                if samplesheet_sample is not None and \
                        samplesheet_sample[6] is not None:
                    libprep_name = samplesheet_sample[6]
                else:
                    LOG.debug('Unable to determine library prep from sample sheet file; try to determine from Charon')
                    try:
                        # Requires Charon access
                        libprep_name = determine_library_prep_from_fcid(project_id, sample_name, fc_full_id)
                        LOG.debug('Found libprep name "{}" in Charon'.format(libprep_name))
                    except ValueError:
                        charon_session = CharonSession()
                        libpreps = charon_session.sample_get_libpreps(project_id, sample_name).get('libpreps')
                        if len(libpreps) == 1:
                            libprep_name = libpreps[0].get('libprepid')
                            LOG.warning('Project "{}" / sample "{}" / seqrun "{}" / fastq "{}" '
                                     'has no libprep information in Charon, but only one '
                                     'library prep is present in Charon ("{}"). Using '
                                     'this as the library prep.'.format(project_name,
                                                                        sample_name,
                                                                        fc_full_id,
                                                                        fq_file,
                                                                        libprep_name))
                        elif fallback_libprep:
                            libprep_name = fallback_libprep
                            LOG.warning('Project "{}" / sample "{}" / seqrun "{}" / fastq "{}" '
                                     'has no libprep information in Charon, but a fallback '
                                     'libprep value of "{}" was supplied -- using this '
                                     'value.'.format(project_name,
                                                     sample_name,
                                                     fc_full_id,
                                                     fq_file,
                                                     libprep_name))
                        else:
                            error_text = ('Project "{}" / sample "{}" / seqrun "{}" / fastq "{}" '
                                          'has no libprep information in Charon. Skipping '
                                          'analysis.'.format(project_name, sample_name,
                                                             fc_full_id, fq_file))
                            LOG.error(error_text)
                            if not config.get('quiet'):
                                mail_analysis(project_name=project_name,
                                              sample_name=sample_name,
                                              level="ERROR",
                                              info_text=error_text)
                            continue
                libprep_object = sample_obj.add_libprep(name=libprep_name,
                                                        dirname=libprep_name)
                libprep_dir = os.path.join(sample_dir, libprep_name)
                if create_files: safe_makedir(libprep_dir, 0o2770)
                seqrun_object = libprep_object.add_seqrun(name=fc_full_id,
                                                          dirname=fc_full_id)
                seqrun_dir = os.path.join(libprep_dir, fc_full_id)
                if create_files: safe_makedir(seqrun_dir, 0o2770)
                seqrun_object.add_fastq_files(fq_file)
            if fastq_files and create_files:
                src_sample_dir = os.path.join(fc_dir_structure['fc_dir'],
                                              project['data_dir'],
                                              project['project_dir'],
                                              sample['sample_dir'])
                for libprep_obj in sample_obj:
                    for seqrun_obj in libprep_obj:
                        src_fastq_files = [os.path.join(src_sample_dir, fastq_file) for
                                           fastq_file in seqrun_obj.fastq_files]
                        seqrun_dst_dir = os.path.join(project_obj.base_path, "DATA", project_obj.dirname,
                                                      sample_obj.dirname, libprep_obj.dirname,
                                                      seqrun_obj.dirname)
                        LOG.info("Symlinking fastq files from {} to {}...".format(src_sample_dir, seqrun_dst_dir))
                        try:
                            do_symlink(src_fastq_files, seqrun_dst_dir)
                        except OSError:
                            error_text = ('Could not symlink files for project/sample'
                                          'libprep/seqrun {}/{}/{}/{}'.format(project_obj,
                                                                              sample_obj,
                                                                              libprep_obj,
                                                                              seqrun_obj))
                            LOG.error(error_text)
                            if not config.get('quiet'):
                                mail_analysis(project_name=project_name,
                                              sample_name=sample_name,
                                              level="ERROR",
                                              info_text=error_text)
                            continue
    return projects_to_analyze


def parse_flowcell(fc_dir):
    """
    Traverse a CASAVA-1.8 or 2.5 generated directory structure for the HiSeq 2500
    and return a dictionary of the elements it contains.

    :param str fc_dir: The directory created by CASAVA for this flowcell.

    :returns: A dict of information about the flowcell, including project/sample info
    :rtype: dict

    :raises OSError: If the fc_dir does not exist or cannot be accessed
    """
    projects = []
    fc_dir = os.path.abspath(fc_dir)
    if not os.access(fc_dir, os.F_OK): os_msg = "does not exist"
    if not os.access(fc_dir, os.R_OK): os_msg = "could not be read (permission denied)"
    if locals().get('os_msg'): raise OSError("Error with flowcell dir {}: directory {}".format(fc_dir, os_msg))
    LOG.info('Parsing flowcell directory "{}"...'.format(fc_dir))
    if os.path.exists(os.path.join(fc_dir, "SampleSheet_copy.csv")):
        samplesheet_path = os.path.join(fc_dir, "SampleSheet_copy.csv")
    else:
        samplesheet_path = os.path.join(fc_dir, "SampleSheet.csv")
    if not os.path.exists(samplesheet_path):
        LOG.warning("Could not find samplesheet in directory {}".format(fc_dir))
        samplesheet_path = None
    else:
        LOG.debug("SampleSheet.csv found at {}".format(samplesheet_path))
    fc_full_id = os.path.basename(fc_dir)
    c2_5_path = os.path.join(fc_dir, "Demultiplexing")
    c1_8_path = os.path.join(fc_dir, "Unaligned*")
    if os.path.exists(c2_5_path):
        data_dir = c2_5_path
    else:
        data_dir = c1_8_path
    for project_dir in glob.glob(os.path.join(data_dir, "*")):
        path, base_dir = os.path.split(project_dir)
        if not base_dir: path, base_dir = os.path.split(path)
        project_original_name = os.path.basename(base_dir).replace('Project_', '')
        project_name = project_original_name.replace('__', '.')
        if not (os.path.isdir(project_dir) and \
                (UPPSALA_PROJECT_RE.match(project_name) or \
                   STHLM_PROJECT_RE.match(project_name))):
            continue
        LOG.info('Parsing project directory "{}"...'.format(
                            project_dir.split(os.path.split(fc_dir)[0] + "/")[1]))
        project_samples = []
        sample_dir_pattern = os.path.join(project_dir, "*")
        if STHLM_X_PROJECT_RE.match(project_name):
            project_name = project_name.replace('_', '.', 1)
        for sample_dir in glob.glob(sample_dir_pattern):
            LOG.info('Parsing samples directory "{}"...'.format(sample_dir.split(
                                                os.path.split(fc_dir)[0] + "/")[1]))
            sample_name = os.path.basename(sample_dir).replace('Sample_', '')
            fastq_file_pattern = os.path.join(sample_dir, "*.fastq.gz")
            fastq_files = [os.path.basename(fq) for fq in glob.glob(fastq_file_pattern)]
            project_samples.append({'sample_dir': os.path.basename(sample_dir),
                                    'sample_name': sample_name,
                                    'files': fastq_files})
        if not project_samples:
            LOG.warning('No samples found for project "{}" in fc "{}"'.format(project_name, fc_dir))
        else:
            projects.append({'data_dir': os.path.relpath(os.path.dirname(project_dir), fc_dir),
                             'project_dir': os.path.basename(project_dir),
                             'project_name': project_name,
                             'project_original_name': project_original_name,
                             'samples': project_samples})
    if not projects:
        raise ValueError('No projects or no projects with sample found in '
                         'flowcell directory {}'.format(fc_dir))
    else:
        return {'fc_dir'    : fc_dir,
                'fc_full_id': fc_full_id,
                'projects': projects,
                'samplesheet_path': samplesheet_path}