maestro-server/scheduler-app

View on GitHub
app/tasks/connections.py

Summary

Maintainability
A
1 hr
Test Coverage
F
0%
import re
import requests
import json

from app import celery

from app.tasks.webhook import task_webhook
from app.tasks.depleted_job import task_deplete
from app.repository.externalMaestroScheduler import ExternalMaestroScheduler


@celery.task(name="connections")
def task_connections(name, _id, endpoint, source='discovery', method="GET", args={}, chain=[]):
    msg = "Task Connection - %s" % _id

    try:
        connType = re.search(r'/[a-zA-Z0-9]{24,24}/([a-z-]*)$', endpoint).group(1)
        conn_id = re.search(r'/([a-zA-Z0-9]{24,24})/', endpoint).group(1)

    except Exception as error:
        deple_id = task_deplete.delay(str(error), _id)
        return {'msg': 'Missing type or id', 'deple_id': deple_id}

    process = 'process.%s.state' % connType
    query = json.dumps({
        '_id': conn_id,
        'status': 'enabled',
        'active': True,
        process: {'$ne': 'danger'}
    })

    resource = ExternalMaestroScheduler(_id) \
        .post_request(path="connections", body={'query': query})

    if resource.get_status() >= 400:
        msg = resource.get_error()
        task_deplete.delay(msg, _id)

    if resource.get_status() < 400:
        result = resource.get_results()

        if result.get('found', 0) == 1:
            webhook_id = task_webhook.delay(name, _id, endpoint, source, method, args, chain)
            return {'webhook_id': webhook_id}

        else:
            msg = "Connection issue - %s" % conn_id
            task_deplete.delay(msg, _id)

    return {'msg': msg, 'status_code': resource.get_status()}