maestro-server/report-app

View on GitHub
app/tasks/ws.py

Summary

Maintainability
A
0 mins
Test Coverage
F
30%

from app import celery
from app.repository.externalMaestroWS import ExternalMaestroWS

@celery.task(name="ws.api")
def task_ws(name, report_id, owner_id, status='success'):
    msg = "Finish Sync"
    title = "%s (%s)" % (name, report_id)
    channel = "maestro-%s" % owner_id

    body = {
        "method": "publish",
        "params": {
            "channel": channel,
            "data": {
                "notify": {
                    "title": title,
                    "msg": msg,
                    "type": status
                },
                "event": {
                    "caller": ["reports-update", "reports-{}".format(report_id)]
                }
            }
        }
    }

    try:
        result = ExternalMaestroWS() \
            .auth_header() \
            .post_request(path="api", body=body) \
            .get_results()

        return {'result': result, 'task': 'ws-notification'}

    except Exception as error:
        return {'message': str(error)}