maestro-server/analytics-maestro

View on GitHub
app/tasks/ws.py

Summary

Maintainability
A
0 mins
Test Coverage
F
30%

from app import celery
from app.repository.externalMaestroWS import ExternalMaestroWS

@celery.task(name="ws.api")
def task_ws(graph_id, owner_id, status='success'):
    title = "Finish Graph"
    msg = "(%s)" % (graph_id)
    channel = "maestro-%s" % owner_id

    body = {
        "method": "publish",
        "params": {
            "channel": channel,
            "data": {
                "notify": {
                    "title": title,
                    "msg": msg,
                    "type": status
                },
                "event": {
                    "caller": ["analytics-update", "analytics-{}".format(graph_id)]
                }
            }
        }
    }

    try:
        result = ExternalMaestroWS() \
            .auth_header() \
            .post_request(path="api", body=body) \
            .get_results()

        return {'result': result, 'task': 'ws-notification'}

    except Exception as error:
        return {'message': str(error)}