KarrLab/wc_utils

View on GitHub
wc_utils/cache.py

Summary

Maintainability
D
2 days
Test Coverage
A
100%
""" Caching

:Author: Jonathan Karr <jonrkarr@gmail.com>
:Date: 2018-03-30
:Copyright: 2018, Karr Lab
:License: MIT
"""

import diskcache
import functools
import glob
import hashlib
import inspect
import os
import types


class Cache(diskcache.FanoutCache):
    """ Cache that shards keys (including the file content of keys that represent file names) and values 

    Attributes:
        hash_block_size (:obj:`int`): block size to use for hashing the content of file arguments
    """

    DEFAULT_DIRECTORY = os.path.expanduser('~/.wc/cache/')

    def __init__(self, directory=DEFAULT_DIRECTORY, hash_block_size=65536, **kwargs):
        """
        Args:
            directory (:obj:`str`, optional): cache directory
            kwargs (:obj:`dict`, optional): arguments to :obj:`diskcache.FanoutCache`
        """
        super(Cache, self).__init__(directory, **kwargs)
        self.hash_block_size = hash_block_size

    def memoize(self, name=None, typed=False, expire=None, tag=None, filename_args=None, filename_kwargs=None):
        """ Memoizing cache decorator

        Args:
            name (:obj:`str`, optional): name given for callable
            typed (:obj:`bool`, optional): cache different types separately
            expire (:obj:`float`, optional): seconds until arguments expire
            tag (:obj:`str`, optional): text to associate with arguments
            filename_args (:obj:`list`, optional): list of indices of arguments that represent filenames
            filename_kwargs (:obj:`list`, optional): list of keys of keyword arguments that represent filenames

        Returns:
            :obj:`types.FunctionType`: callable decorator
        """
        if callable(name):
            raise TypeError('name cannot be callable')

        filename_args = filename_args or []
        filename_kwargs = filename_kwargs or []

        def decorator(function):
            """ Decorator created by memoize call for callable. """
            if name is None:
                try:
                    # Python 3
                    reference = function.__qualname__
                except AttributeError:  # pragma: no cover
                    # Python 2
                    reference = function.__name__  # pragma: no cover

                reference = function.__module__ + reference
            else:
                reference = name

            reference = (reference,)

            @functools.wraps(function)
            def wrapper(*args, **kwargs):
                "Wrapper for callable to cache arguments and return values."

                # match arguments to function signature
                func_signature = inspect.signature(function)
                proc_args = []
                proc_kwargs = {}
                for i_param, (name, param) in enumerate(func_signature.parameters.items()):
                    if param.kind is not inspect.Parameter.POSITIONAL_OR_KEYWORD:
                        raise NotImplementedError('Memoize decorator only supports positional-or-keyword arguments. '
                                                  'Submit issue to request support for optional arguments')

                    if param.default is inspect._empty:
                        if i_param < len(args):
                            val = args[i_param]
                        elif param.name in kwargs:
                            val = kwargs[param.name]
                        else:
                            raise TypeError("{} missing required positional argument '{}'".format(function.__name__, param.name))
                        proc_args.append(val)
                    else:
                        if i_param < len(args):
                            val = args[i_param]
                        elif param.name in kwargs:
                            val = kwargs[param.name]
                        else:
                            val = param.default
                        proc_kwargs[param.name] = val

                # generate key from arguments
                key = reference + tuple(proc_args)

                if proc_kwargs:
                    key += (diskcache.core.ENOVAL,)
                    sorted_items = sorted(proc_kwargs.items())

                    for item in sorted_items:
                        key += item

                if typed:
                    key += tuple(type(arg) for arg in proc_args)

                    if proc_kwargs:
                        key += tuple(type(value) for _, value in sorted_items)

                for filename_arg in filename_args:
                    stats = []
                    for filename in glob.glob(proc_args[filename_arg]):
                        stats.append((os.path.getmtime(filename), self._hash_file_content(filename)))
                    key += tuple(stats)

                for filename_kwarg in filename_kwargs:
                    if filename_kwarg in proc_kwargs:
                        stats = []
                        for filename in glob.glob(proc_kwargs[filename_kwarg]):
                            stats.append((os.path.getmtime(filename), self._hash_file_content(filename)))
                        key += tuple(stats)

                result = self.get(key, default=diskcache.core.ENOVAL, retry=True)

                if result is diskcache.core.ENOVAL:
                    result = function(*args, **kwargs)
                    self.set(key, result, expire=expire, tag=tag, retry=True)

                return result

            return wrapper

        return decorator

    def _hash_file_content(self, path):
        """ Hash the content of a file

        Args:
            path (:obj:`str`): path to the file to hash the contents of

        Returns:
            :obj:`str`: hash of the content of the file
        """
        hasher = hashlib.sha1()
        with open(path, 'rb') as file:
            buffer = file.read(self.hash_block_size)
            while len(buffer) > 0:
                hasher.update(buffer)
                buffer = file.read(self.hash_block_size)
        return hasher.hexdigest()


cache = Cache()
# :obj:`Cache`: cache

memoize = cache.memoize
# :obj:`types.FunctionType`: memoize method