ErikGartner/hyperdock

View on GitHub
hyperdock/common/workqueue.py

Summary

Maintainability
A
1 hr
Test Coverage
from datetime import datetime, timedelta

from bson.objectid import ObjectId

WORK_TIMEOUT = 600


class WorkQueue:
    """
    A simple MongoDB priority work queue that handles the queue
    of experiment.
    """

    def __init__(self, mongodb):
        super().__init__()

        self._mongodb = mongodb
        self._collection = mongodb.workqueue

    def assign_next_job(self, worker_id):
        """
        Assigns the next free job to worker.
        Returns the object from the mongodb.
        """
        t = datetime.utcnow()
        job = self._collection.find_and_modify(
            query={"start_time": -1, "cancelled": False},
            sort=[("priority", -1), ("created_on", 1)],
            update={"$set": {"start_time": t, "last_update": t, "worker": worker_id}},
            new=True,
        )
        return job

    def add_job(self, parameters, data, trial_id, trial_name, priority=0):
        """
        Adds new work to the workqueue.
        """
        id = self._collection.insert(
            {
                "start_time": -1,
                "end_time": -1,
                "last_update": -1,
                "created_on": datetime.utcnow(),
                "priority": priority,
                "parameters": parameters,
                "data": data,
                "worker": None,
                "result": {},
                "trial": trial_id,
                "trial_name": trial_name,
                "_id": str(ObjectId()),
                "cancelled": False,
                "orphaned": False,
            }
        )
        return id

    def update_job(self, _id, update=None):
        """
        Marks the job as alive and post an update from the job.
        """
        t = datetime.utcnow()
        self._collection.update(
            {"_id": _id}, {"$set": {"last_update": t, "update": update}}
        )

    def is_job_cancelled(self, _id):
        """
        Checks if a certain job has been cancelled or all together removed.
        """
        return self._collection.find_one({"_id": _id, "cancelled": False}) is None

    def finish_job(self, _id, result):
        """
        Marks the job as finished and attach the result.
        """
        t = datetime.utcnow()
        self._collection.update_one(
            {"_id": _id}, {"$set": {"end_time": t, "last_update": t, "result": result}}
        )

    def purge_dead_jobs(self):
        """
        Returns jobs that have timed out due to worker death and cancel them.
        """
        now = datetime.utcnow()
        deadline = now - timedelta(seconds=WORK_TIMEOUT)
        jobs = []
        while True:
            job = self._collection.find_and_modify(
                query={
                    "start_time": {"$ne": -1},
                    "end_time": -1,
                    "last_update": {"$lt": deadline},
                },
                sort=[("priority", -1), ("last_update", 1)],
                update={
                    "$set": {
                        "cancelled": True,
                        "orphaned": True,
                        "end_time": now,
                        "result": {"state": "fail", "msg": "Timed out!"},
                    }
                },
                new=True,
            )

            if job is not None:
                jobs.append(job)
            else:
                return jobs

    def check_for_orphans(self, id_list):
        """
        Checks if a list of Docker container ids are marked as orphans.
        Returns a list of (Docker id, experiment id) tuples.
        """
        jobs = self._collection.find(
            {"orphaned": True, "update.container.long_id": {"$in": id_list}}
        )
        return [(j["update"]["container"]["long_id"], j["_id"]) for j in list(jobs)]

    def not_orphaned(self, _id):
        """
        Marks a job as not orphaned.
        """
        job = self._collection.find_and_modify(
            query={"_id": _id}, update={"$set": {"orphaned": False}}, new=True
        )
        return job is not None

    def cancel_invalid_jobs(self, trial_list):
        """
        Takes a list of all active (not finished, cancelled or removed) trial ids.
        Work that is not associated with any of these are cancelled.
        """
        now = datetime.utcnow()
        jobs = []
        while True:
            job = self._collection.find_and_modify(
                query={"trial": {"$nin": trial_list}, "end_time": -1},
                update={
                    "$set": {
                        "cancelled": True,
                        "end_time": now,
                        "result": {"state": "fail", "msg": "Abandoned"},
                    }
                },
                new=True,
            )

            if job is not None:
                jobs.append(job)
            else:
                return jobs