cuebook/cuelake

View on GitHub
api/workflows/services/workflowActions.py

Summary

Maintainability
A
0 mins
Test Coverage
F
50%
import datetime as dt
from app.celery import app
from workflows.models import (
    WorkflowRunLogs,
    STATUS_RUNNING,
    STATUS_QUEUED,
    STATUS_ABORTED
)
from utils.apiResponse import ApiResponse

from genie.tasks import runNotebookJob as runNotebookJobTask
from genie.services import NotebookJobServices
from genie.models import CustomSchedule, NotebookRunLogs, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_QUEUED, NOTEBOOK_STATUS_ABORT


class WorkflowActions:
    @staticmethod
    def runWorkflow(workflowId: int):
        """
        Runs given workflow
        """
        from workflows.tasks import runWorkflowJob

        res = ApiResponse(message="Error in running workflow")

        existingWorkflows = WorkflowRunLogs.objects.filter(workflow_id=workflowId).order_by(
            "-startTimestamp"
        )
        if existingWorkflows.count() and existingWorkflows[0].status in [
            STATUS_RUNNING,
            STATUS_QUEUED,
        ]:
            res.update(False, "Can't run already running workflow")
            return res

        workflowRun = WorkflowRunLogs.objects.create(
            workflow_id=workflowId, status=STATUS_QUEUED
        )
        runWorkflowJob.delay(workflowId=workflowId, workflowRunLogsId=workflowRun.id)
        res.update(True, "Ran workflow successfully")
        return res

    @staticmethod
    def stopWorkflow(workflowRunLogsId: int):
        """
        Stops given workflow
        """
        res = ApiResponse(message="Error in stopping workflow")
        
        # Stopping workflow task
        workflowRun = WorkflowRunLogs.objects.get(id=workflowRunLogsId)
        # Revoke celery task
        app.control.revoke(workflowRun.taskId, terminate=True)
        # Update workflow run status
        workflowRun.status = STATUS_ABORTED
        workflowRun.endTimestamp = dt.datetime.now()
        workflowRun.save()

        # Stopping notebook tasks
        notebookRunLogs = NotebookRunLogs.objects.filter(WorkflowRunLogs=workflowRunLogsId)
        for notebookRunLog in notebookRunLogs:
            if notebookRunLog.status == NOTEBOOK_STATUS_QUEUED:
                app.control.revoke(notebookRunLog.taskId, terminate=True)
                notebookRunLog.status = NOTEBOOK_STATUS_ABORT
                notebookRunLog.save()
            elif notebookRunLog.status == NOTEBOOK_STATUS_RUNNING:
                notebookRunLog.status = NOTEBOOK_STATUS_ABORT
                notebookRunLog.save()
                NotebookJobServices.stopNotebookJob(notebookRunLog.notebookId)

        res.update(True, "Stopped workflow successfully")
        return res