NationalGenomicsInfrastructure/ngi_pipeline

View on GitHub
ngi_pipeline/utils/filesystem.py

Summary

Maintainability
F
3 days
Test Coverage
from __future__ import print_function

import contextlib
import datetime
import fnmatch
import functools
import glob
import os
import re
import shlex
import shutil
import stat
import subprocess
import tempfile

from ngi_pipeline.conductor.classes import NGIProject
from ngi_pipeline.log.loggers import minimal_logger
from ngi_pipeline.utils.classes import with_ngi_config

from requests.exceptions import Timeout
from six.moves import map
from six.moves import filter


LOG = minimal_logger(__name__)

@with_ngi_config
def load_modules(modules_list, config=None, config_file_path=None):
    """
    Takes a list of environment modules to load (in order) and
    loads them using modulecmd python load

    :param list modules_list: The list of modules to load

    :raises RuntimeError: If there is a problem loading the modules
    """
    # Module loading is normally controlled by a bash function
    # As well as the modulecmd bash which is used in .bashrc, there's also
    # a modulecmd python which allows us to use modules from within python
    # UPPMAX support staff didn't seem to know this existed, so use with caution
    error_msgs = []
    for module in modules_list:
        # Yuck
        lmod_location=os.environ.get('LMOD_CMD', "/usr/lib/lmod/lmod/libexec/lmod")
        cl = "{lmod} python load {module}".format(lmod=lmod_location,
                                                  module=module)
        p = subprocess.Popen(shlex.split(cl), stdout=subprocess.PIPE,
                                              stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        try:
            assert(stdout), stderr
            exec(stdout)
        except Exception as e:
            error_msg = "Error loading module {}: {}".format(module, e)
            error_msgs.append(error_msg)
    if error_msgs:
        raise RuntimeError("".join(error_msgs))


@with_ngi_config
def locate_flowcell(flowcell, config=None, config_file_path=None):
    """Given a flowcell, returns the full path to the flowcell if possible,
    searching the config file's specified environment.flowcell_inbox
    if needed. If the flowcell passed in is already a valid path, returns that.

    :param str flowcell: The name of (or path to) the flowcell
    :returns: The path to the flowcell
    :rtype: str
    :raises ValueError: If a valid path cannot be found
    """
    if os.path.exists(flowcell):
        return os.path.abspath(flowcell)
    else:
        try:
            flowcell_inbox_dirs = config["environment"]["flowcell_inbox"]
        except (KeyError, TypeError) as e:
            raise ValueError('Path to incoming flowcell directory not available in '
                             'config file (environment.flowcell_inbox) and flowcell '
                             'is not an absolute path ({}).'.format(flowcell))
        else:
            for flowcell_inbox_dir in flowcell_inbox_dirs:
                flowcell_dir = os.path.join(flowcell_inbox_dir, flowcell)
                if os.path.exists(flowcell_dir):
                    return flowcell_dir

            raise ValueError('Flowcell directory passed as flowcell name (not full '
                             'path) and does not exist under incoming flowcell dir '
                             'as specified in configuration file (at {}).'.format(flowcell_dir))


@with_ngi_config
def locate_project(project, subdir="DATA", resolve_symlinks=True,
                   config=None, config_file_path=None):
    """Given a project, returns the full path to the project if possible,
    searching the config file's specified analysis.top_dir if needed.
    If the project passed in is already a valid path, returns that.

    :param str project: The name of (or path to) the project
    :param str subdir: The subdirectory to use ("DATA" or "ANALYSIS")
    :param bool resolve_symlinks: Resolve symlinks when found (default True)
    :returns: The path to the project
    :rtype: str
    :raises ValueError: If a valid path cannot be found
    """
    if os.path.exists(project):
        return os.path.abspath(project)
    else:
        try:
            project_data_dir=os.path.join(config["analysis"]["base_root"], config["analysis"]["sthlm_root"], config["analysis"]["top_dir"], subdir)
            if not os.path.exists(project_data_dir):
                project_data_dir=os.path.join(config["analysis"]["base_root"], config["analysis"]["upps_root"], config["analysis"]["top_dir"], subdir)
        except (KeyError, TypeError) as e:
            raise ValueError('Path to project data directory not available in '
                             'config file (analysis.top_dir) and project '
                             'is not an absolute path ({}).'.format(project))
        else:
            project_dir = os.path.join(project_data_dir, project)
        if not os.path.exists(project_dir):
            raise ValueError('project directory passed as project name (not '
                             'full path) and does not exist under project '
                             'data directory as specified in configuration '
                             'file (at {}).'.format(project_dir))
        else:
            if os.path.islink(project_dir):
                try:
                    return os.path.realpath(project_dir)
                except OSError:
                    pass
            return project_dir


def execute_command_line(cl, shell=False, stdout=None, stderr=None, cwd=None):
    """Execute a command line and return the subprocess.Popen object.

    :param cl: Can be either a list or a string; if string, gets shlex.splitted
    :param bool shell: value of shell to pass to subprocess
    :param file stdout: The filehandle destination for STDOUT (can be None)
    :param file stderr: The filehandle destination for STDERR (can be None)
    :param str cwd: The directory to be used as CWD for the process launched

    :returns: The subprocess.Popen object
    :rtype: subprocess.Popen

    :raises RuntimeError: If the OS command-line execution failed.
    """
    if cwd and not os.path.isdir(cwd):
        LOG.warning("CWD specified, \"{}\", is not a valid directory for "
                 "command \"{}\". Setting to None.".format(cwd, cl))
        ## FIXME Better to just raise an exception
        cwd = None
    if type(cl) is str and shell == False:
        LOG.info("Executing command line: {}".format(cl))
        cl = shlex.split(cl)
    if type(cl) is list and shell == True:
        cl = " ".join(cl)
        LOG.info("Executing command line: {}".format(cl))
    try:
        p_handle = subprocess.Popen(cl, stdout=stdout,
                                        stderr=stderr,
                                        cwd=cwd,
                                        shell=shell,
                                        universal_newlines=True)
        error_msg = None
    except OSError:
        error_msg = ("Cannot execute command; missing executable on the path? "
                     "(Command \"{}\")".format(cl))
    except ValueError:
        error_msg = ("Cannot execute command; command malformed. "
                     "(Command \"{}\")".format(cl))
    except subprocess.CalledProcessError as e:
        error_msg = ("Error when executing command: \"{}\" "
                     "(Command \"{}\")".format(e, cl))
    if error_msg:
        raise RuntimeError(error_msg)
    return p_handle

def do_symlink(src_files, dst_dir):
    do_link(src_files, dst_dir, 'soft')

def do_hardlink(src_files, dst_dir):
    do_link(src_files, dst_dir, 'hard')

def do_link(src_files, dst_dir, link_type='soft'):
    if link_type == 'hard':
        link_f=os.link
    else:
        link_f=os.symlink
    for src_file in src_files:
        base_file = os.path.basename(src_file)
        dst_file = os.path.join(dst_dir, base_file)
        if not os.path.isfile(dst_file):
            link_f(os.path.realpath(src_file), dst_file)


def safe_makedir(dname, mode=0o2770):
    """Make a directory (tree) if it doesn't exist, handling concurrent race
    conditions.
    """
    if not os.path.exists(dname):
        # we could get an error here if multiple processes are creating
        # the directory at the same time. Grr, concurrency.
        try:
            os.makedirs(dname, mode=mode)
        except OSError:
            if not os.path.isdir(dname):
                raise
    return dname

def rotate_file(file_path, new_subdirectory="rotated_files"):
    if os.path.exists(file_path) and os.path.isfile(file_path):
        file_dirpath, extension = os.path.splitext(file_path)
        file_name = os.path.basename(file_dirpath)
        current_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S:%f")
        if new_subdirectory:
            rotated_file_basepath = os.path.join(os.path.dirname(file_path),
                                                 new_subdirectory)
        else:
            rotated_file_basepath = os.path.dirname(file_path)
        safe_makedir(rotated_file_basepath)

        rotate_file_path = os.path.join(rotated_file_basepath,
                                        "{}-{}.rotated{}".format(file_name,
                                                                 current_datetime,
                                                                 extension))
        ## TODO what exceptions can we get here? OSError, else?
        try:
            LOG.info('Attempting to rotate file "{}" to '
                     '"{}"...'.format(file_path, rotate_file_path))
            ## FIXME check if the log file is currently open!!?? How?!!
            shutil.move(file_path, rotate_file_path)
        except OSError as e:
            raise OSError('Could not rotate log file "{}" to "{}": '
                          '{}'.format(file_path, rotate_file_path, e))

@contextlib.contextmanager
def chdir(new_dir):
    """Context manager to temporarily change to a new directory.
    """
    cur_dir = os.getcwd()
    # This is weird behavior. I'm removing and and we'll see if anything breaks.
    #safe_makedir(new_dir)
    os.chdir(new_dir)
    try:
        yield
    finally:
        os.chdir(cur_dir)

@with_ngi_config
def recreate_project_from_filesystem(project_dir,
                                     restrict_to_samples=None,
                                     restrict_to_libpreps=None,
                                     restrict_to_seqruns=None,
                                     config=None, config_file_path=None):
    """Recreates the full project/sample/libprep/seqrun set of
    NGIObjects using the directory tree structure."""

    from ngi_pipeline.database.classes import CharonError
    from ngi_pipeline.database.communicate import get_project_id_from_name

    if not restrict_to_samples: restrict_to_samples = []
    if not restrict_to_libpreps: restrict_to_libpreps = []
    if not restrict_to_seqruns: restrict_to_seqruns = []

    project_dir = locate_project(project_dir)

    if os.path.islink(os.path.abspath(project_dir)):
        real_project_dir = os.path.realpath(project_dir)
        syml_project_dir = os.path.abspath(project_dir)
    else:
        real_project_dir = os.path.abspath(project_dir)
        search_dir = os.path.join(os.path.dirname(project_dir), "*")
        sym_files = list(filter(os.path.islink, glob.glob(search_dir)))
        for sym_file in sym_files:
            if os.path.realpath(sym_file) == os.path.realpath(real_project_dir):
                syml_project_dir = os.path.abspath(sym_file)
                break
        else:
            syml_project_dir = None
    project_base_path, project_id = os.path.split(real_project_dir)
    if syml_project_dir:
        project_base_path, project_name = os.path.split(syml_project_dir)
    else: # project name is the same as project id (Uppsala perhaps)
        project_name = project_id
    if os.path.split(project_base_path)[1] == "DATA":
        project_base_path = os.path.split(project_base_path)[0]
    LOG.info('Setting up project "{}"'.format(project_id))
    project_obj = NGIProject(name=project_name,
                             dirname=project_id,
                             project_id=project_id,
                             base_path=project_base_path)
    samples_pattern = os.path.join(real_project_dir, "*")
    samples = list(filter(os.path.isdir, glob.glob(samples_pattern)))
    if not samples:
        LOG.warning('No samples found for project "{}"'.format(project_obj))
    for sample_dir in samples:
        sample_name = os.path.basename(sample_dir)
        if restrict_to_samples and sample_name not in restrict_to_samples:
            LOG.debug('Skipping sample "{}": not in specified samples '
                      '"{}"'.format(sample_name, ', '.join(restrict_to_samples)))
            continue
        LOG.info('Setting up sample "{}"'.format(sample_name))
        sample_obj = project_obj.add_sample(name=sample_name, dirname=sample_name)

        libpreps_pattern = os.path.join(sample_dir, "*")
        libpreps = list(filter(os.path.isdir, glob.glob(libpreps_pattern)))
        if not libpreps:
            LOG.warning('No libpreps found for sample "{}"'.format(sample_obj))
        for libprep_dir in libpreps:
            libprep_name = os.path.basename(libprep_dir)
            if restrict_to_libpreps and libprep_name not in restrict_to_libpreps:
                LOG.debug('Skipping libprep "{}": not in specified libpreps '
                          '"{}"'.format(libprep_name, ', '.join(restrict_to_libpreps)))
                continue
            LOG.info('Setting up libprep "{}"'.format(libprep_name))
            libprep_obj = sample_obj.add_libprep(name=libprep_name,
                                                 dirname=libprep_name)

            seqruns_pattern = os.path.join(libprep_dir, "*_*_*_*")
            seqruns = list(filter(os.path.isdir, glob.glob(seqruns_pattern)))
            if not seqruns:
                LOG.warning('No seqruns found for libprep "{}"'.format(libprep_obj))
            for seqrun_dir in seqruns:
                seqrun_name = os.path.basename(seqrun_dir)
                if restrict_to_seqruns and seqrun_name not in restrict_to_seqruns:
                    LOG.debug('Skipping seqrun "{}": not in specified seqruns '
                              '"{}"'.format(seqrun_name, ', '.join(restrict_to_seqruns)))
                    continue
                LOG.info('Setting up seqrun "{}"'.format(seqrun_name))
                seqrun_obj = libprep_obj.add_seqrun(name=seqrun_name,
                                                    dirname=seqrun_name)
                for fq_file in fastq_files_under_dir(seqrun_dir, realpath=False):
                    fq_name = os.path.basename(fq_file)
                    LOG.info('Adding fastq file "{}" to seqrun "{}"'.format(fq_name, seqrun_obj))
                    seqrun_obj.add_fastq_files([fq_name])
    return project_obj


def is_index_file(fastq_file, index_file_pattern=r'_L00\d_I\d_'):
    """
    Returns True if the fastq file appears to be an index file, based on the file name pattern

    :param fastq_file: the file name of the fastq file
    :param index_file_pattern: a regexp pattern that discriminates index files from non-index files.
    Will use '_L00\d_I\d_' if not specified
    :return: True if file name matches the index file pattern, False otherwise
    """
    return re.search(index_file_pattern, os.path.basename(fastq_file)) is not None


def fastq_files_under_dir(dirname, realpath=True):
    return match_files_under_dir(dirname,
                                 pattern=".*\.(fastq|fq)(\.gz|\.gzip|\.bz2)?$",
                                 pt_style="regex",
                                 realpath=realpath)


def match_files_under_dir(dirname, pattern, pt_style="regex", realpath=True):
    """Find all the files under a directory that match pattern.

    :parm str dirname: The directory under which to search
    :param str pattern: The pattern against which to match
    :param str pt_style: pattern style, "regex" or "shell"
    :param bool realpath: If true, dereferences symbolic links

    :returns: A list of full paths to the fastq files, using dereferenced paths if realpath=True
    :rtype: list
    """
    if pt_style not in ("regex", "shell"):
        LOG.warning('Chosen pattern style "{}" invalid (must be "regex" or "shell"); '
                 'falling back to "regex".')
        pt_style = "regex"
    if pt_style == "regex": pt_comp = re.compile(pattern)
    matches = []
    for root, dirnames, filenames in os.walk(dirname):
        if pt_style == "shell":
            for filename in fnmatch.filter(filenames, pattern):
                match = os.path.abspath(os.path.join(root, filename))
                file_path = os.path.join(root, filename)
                if realpath:
                    matches.append(os.path.realpath(file_path))
                else:
                    matches.append(os.path.abspath(file_path))
        else: # regex-style
            file_matches = list(filter(pt_comp.search, filenames))
            file_paths = [os.path.join(root, filename) for filename in file_matches]
            if file_paths:
                if realpath:
                    matches.extend(list(map(os.path.realpath, file_paths)))
                else:
                    matches.extend(list(map(os.path.abspath, file_paths)))
    return matches