Dallinger/Dallinger

View on GitHub
dallinger/heroku/clock.py

Summary

Maintainability
A
0 mins
Test Coverage
"""A clock process."""

from collections import defaultdict
from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

import dallinger
from dallinger import db, recruiters
from dallinger.experiment import EXPERIMENT_TASK_REGISTRATIONS
from dallinger.models import Participant
from dallinger.utils import ParticipationTime

scheduler = BlockingScheduler()


def run_check(participants, config, reference_time):
    """For each participant, if they've been active for longer than the
    experiment duration + 2 minutes, we take action.
    """
    recruiters_with_late_participants = defaultdict(list)
    for p in participants:
        timeline = ParticipationTime(p, reference_time, config)
        if timeline.is_overdue:
            print(
                "Error: participant {} with status {} has been playing for too "
                "long - their recruiter will be notified.".format(p.id, p.status)
            )
            recruiters_with_late_participants[p.recruiter_id].append(p)

    for recruiter_id, participants in recruiters_with_late_participants.items():
        recruiter = recruiters.by_name(recruiter_id)
        recruiter.notify_duration_exceeded(participants, reference_time)


@scheduler.scheduled_job("interval", minutes=0.5)
def check_db_for_missing_notifications():
    """Check the database for missing notifications."""
    config = dallinger.config.get_config()
    participants = Participant.query.filter_by(status="working").all()
    reference_time = datetime.now()

    run_check(participants, config, reference_time)
    db.session.commit()


def launch():
    config = dallinger.config.get_config()
    if not config.ready:
        config.load()

    # Import the experiment.
    experiment_class = dallinger.experiment.load()

    for args in EXPERIMENT_TASK_REGISTRATIONS:
        meth_name = args.pop("func_name")
        task = getattr(experiment_class, meth_name, None)
        if task is not None:
            scheduler.add_job(
                task,
                trigger=args["trigger"],
                replace_existing=True,
                **dict(args["kwargs"]),
            )

    scheduler.start()