iterative/dvc

View on GitHub
dvc/commands/queue/status.py

Summary

Maintainability
A
0 mins
Test Coverage
from dvc.cli import formatter
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.compare import TabularData
from dvc.log import logger
from dvc.ui import ui

logger = logger.getChild(__name__)


class CmdQueueStatus(CmdBase):
    """Show queue task and worker status."""

    def run(self) -> int:
        from dvc.repo.experiments.show import format_time

        result = self.repo.experiments.celery_queue.status()
        if result:
            all_headers = ["Task", "Name", "Created", "Status"]
            td = TabularData(all_headers)
            for exp in result:
                created = format_time(exp.get("timestamp"))
                assert exp["rev"]
                assert exp["status"]
                td.append(
                    [
                        exp["rev"][:7],
                        exp.get("name") or "",
                        created,
                        exp["status"],
                    ]
                )
            td.render()
        else:
            ui.write("No experiment tasks in the queue.")
        ui.write()

        worker_status = self.repo.experiments.celery_queue.worker_status()
        active_count = len([name for name, task in worker_status.items() if task])
        idle_count = len(worker_status) - active_count

        ui.write(f"Worker status: {active_count} active, {idle_count} idle")

        return 0


def add_parser(queue_subparsers, parent_parser):
    QUEUE_STATUS_HELP = "Show the status of experiments queue tasks and workers."
    queue_status_parser = queue_subparsers.add_parser(
        "status",
        parents=[parent_parser],
        description=append_doc_link(QUEUE_STATUS_HELP, "queue/status"),
        help=QUEUE_STATUS_HELP,
        formatter_class=formatter.RawDescriptionHelpFormatter,
    )
    queue_status_parser.set_defaults(func=CmdQueueStatus)