gaujin/tornado-crontab

View on GitHub
tornado_crontab/_crontab.py

Summary

Maintainability
A
1 hr
Test Coverage
import functools
import inspect
import logging
import math
import os
import warnings

from crontab import CronTab
from tornado import version_info as tornado_version_info
from tornado.ioloop import PeriodicCallback


log_crontab = logging.getLogger("tornado-crontab.crontab")
FORMAT_LOG_CRONTAB = " ".join(["tornado-crontab[%(pid)d]:",
                               "(%(user)s)",
                               "FUNC",
                               "(%(funcname)s",
                               "%(args)s %(kwargs)s)"])

IS_TZ_SUPPORTED = "default_utc" in inspect.getargspec(CronTab.next).args
UNSUPPORTED_TZ_MESSAGE = """\
Since crontab package version is low, UTC can not be used.
When using it with UTC, upgrade the crontab package to 0.22.0+."""
UNSUPPORTED_IOLOOP_MESSAGE = """\
Since tornado package version is high, io_loop can not be used.
When using it with io_loop, downgrade the tornado package to 4.5.3 or earlier."""


class CronTabCallback(PeriodicCallback):
    """ Crontab Callback Class

    Schedule execution of the function.
    Timezone is local time by default.
    If you want to schedule with UTC, set `is_utc` to `True`.

    .. versionchanged:: 0.3.3
       Supported UTC.
       If Timezone is not supported and `is_utc` is set to `True`,
       a warning is output and `is_utc` is ignored.

    .. versionchanged:: 0.4
       Supported Tornado 5, warning is output and `io_loop` is ignored.
       The `io_loop` argument is deprecated.
    """

    def __init__(self, callback, schedule, io_loop=None, is_utc=False):
        """ CrontabCallback initializer

        :type callback: func
        :param callback: target schedule function
        :type schedule: str
        :param schedule: crotab expression
        :type io_loop: tornado.ioloop.IOLoop
        :param io_loop: tornado IOLoop
        :type is_utc: bool
        :param is_utc: schedule timezone is UTC. (True:UTC, False:Local Timezone)
        """

        # If Timezone is not supported and `is_utc` is set to `True`,
        # a warning is output and `is_utc` is ignored.
        if not IS_TZ_SUPPORTED and is_utc:
            warnings.warn(UNSUPPORTED_TZ_MESSAGE)
            is_utc = False

        self.__crontab = CronTab(schedule)
        self.__is_utc = is_utc

        arguments = dict(
            callback=callback, callback_time=self._calc_callbacktime())

        if tornado_version_info >= (5,):
            if io_loop is not None:
                warnings.warn(UNSUPPORTED_IOLOOP_MESSAGE)
        else:
            arguments.update(io_loop=io_loop)

        super(CronTabCallback, self).__init__(**arguments)

        self.pid = os.getpid()

        if os.name == "nt":
            self.user = os.environ.get("USERNAME")
        else:
            import pwd
            self.user = pwd.getpwuid(os.geteuid()).pw_name

    def _calc_callbacktime(self, now=None):

        _kwargs = dict(now=now)

        if IS_TZ_SUPPORTED:
            _kwargs.update(dict(default_utc=self.__is_utc))

        return math.ceil(
            self.__crontab.next(**_kwargs)) * 1000.0

    def _get_func_spec(self):

        _args = []
        _kwargs = {}

        def _get_func(_func):

            if not isinstance(_func, functools.partial):
                return _func

            for _arg in reversed(_func.args):
                _args.insert(0, _arg)

            if _func.keywords:
                _kwargs.update(_func.keywords)

            return _get_func(_func.func)

        _func = _get_func(self.callback)
        return _func, _args, _kwargs

    def _logging(self, level):

        if self._running and log_crontab.isEnabledFor(level):

            _func, _args, _kwargs = self._get_func_spec()

            log_crontab.log(level,
                            FORMAT_LOG_CRONTAB % dict(pid=self.pid,
                                                      user=self.user,
                                                      funcname=_func.__name__,
                                                      args=_args,
                                                      kwargs=_kwargs))

    def _run(self):

        self._logging(logging.INFO)

        try:
            PeriodicCallback._run(self)

        finally:

            self._logging(logging.DEBUG)

    def _schedule_next(self):
        self.callback_time = self._calc_callbacktime()
        super(CronTabCallback, self)._schedule_next()


def crontab(schedule, io_loop=None, is_utc=False):
    """ Crontab Decorator

    Decorate this function to the function you want to execute as scheduled.
    Timezone is local time by default.
    If you want to schedule with UTC, set `is_utc` to `True`.

    .. versionchanged:: 0.3.3
       Supported UTC.
       If Timezone is not supported and `is_utc` is set to `True`,
       a warning is output and `is_utc` is ignored.

    .. versionchanged:: 0.4
       Supported Tornado 5, warning is output and `io_loop` is ignored.
       The `io_loop` argument is deprecated.

    :type schedule: str
    :param schedule: crotab expression
    :type io_loop: tornado.ioloop.IOLoop
    :param io_loop: tornado IOLoop
    :type is_utc: bool
    :param is_utc: schedule timezone is UTC. (True:UTC, False:Local Timezone)
    :rtype: func
    :return: scheduled execute function
    """

    def receive_func(func):

        @functools.wraps(func)
        def wrapper(*args, **kwargs):

            _func = functools.partial(func, *args, **kwargs)
            CronTabCallback(_func, schedule=schedule,
                            io_loop=io_loop, is_utc=is_utc).start()

        return wrapper
    return receive_func