pacifica/dispatcher/receiver.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# pacifica-dispatcher: pacifica/dispatcher/receiver.py
#
# Copyright (c) 2019, Battelle Memorial Institute
# All rights reserved.
#
# See LICENSE for details.
"""
Receiver module.
Contains a factory returning a receiver class that can create
PeeWee models and CherryPy endpoints.
"""
import datetime
import json
import sys
import traceback
import typing
import uuid
import functools
from contextlib import contextmanager
import celery
import cherrypy
import peewee
from .router import RouteNotFoundRouterError, Router
# pylint: disable=too-many-statements
# this is not too many statements it's all wrapped up in the class.
def create_peewee_model(passed_db: peewee.Database) -> object:
"""Factory creating a receiver class."""
@contextmanager
def closed_db_context(ctxt_db=passed_db):
"""Context to refresh db connection."""
try:
if not ctxt_db.is_closed(): # pragma: no cover this just in case
ctxt_db.close()
ctxt_db.connect()
yield ctxt_db
finally:
ctxt_db.close()
def refresh_database(func):
"""Decorator to bind and wrap models to databases."""
@functools.wraps(func)
def inner(*args, **kwargs):
with closed_db_context():
return func(*args, **kwargs)
return inner
class ReceiveTaskModel(peewee.Model):
"""
Receiver task model class.
This class is the primary interface class wrapping up
creating PeeWee models, handling CherryPy rest
interfaces and Celery backend workers.
"""
uuid = peewee.UUIDField(default=uuid.uuid4, index=True, primary_key=True)
event_type = peewee.CharField(index=True, null=True)
event_type_version = peewee.CharField(index=True, null=True)
cloud_events_version = peewee.CharField(index=True, null=True)
source = peewee.CharField(index=True, null=True)
event_id = peewee.CharField(index=True, null=True)
event_time = peewee.CharField(index=True, null=True)
schema_url = peewee.CharField(index=True, null=True)
content_type = peewee.CharField(index=True, null=True)
event_data = peewee.TextField()
data = peewee.TextField()
task_id = peewee.UUIDField(index=True, unique=True)
task_application_name = peewee.CharField(index=True)
task_name = peewee.CharField(index=True)
task_status = peewee.CharField(index=True)
exc_type = peewee.CharField(null=True)
exc_value = peewee.CharField(null=True)
exc_traceback = peewee.TextField()
created = peewee.DateTimeField(default=datetime.datetime.now, index=True)
updated = peewee.DateTimeField(default=datetime.datetime.now, index=True)
deleted = peewee.DateTimeField(index=True, null=True)
# pylint: disable=too-few-public-methods
class Meta:
"""Meta class connecting the database."""
database = passed_db
# pylint: enable=too-few-public-methods
@classmethod
def create_celery_app(cls, router: Router, name: str, receive_task_name: str,
*args, **kwargs) -> celery.Celery:
"""
Create the Celery app.
Creates the Celery tasks and app to process events to
backend workers.
"""
celery_app = celery.Celery(name, *args, **kwargs)
celery_app.conf.worker_redirect_stdouts = False
# pylint: disable=unused-variable
@celery_app.task(bind=True, ignore_result=True, name=receive_task_name)
def receive_task(self, event_data: typing.Dict[str, typing.Any]) -> None:
"""Primary Celery task entrypoint."""
inst = cls(**{
'event_type': event_data.get('eventType', None),
'event_type_version': event_data.get('eventTypeVersion', None),
'source': event_data.get('source', None),
'event_id': event_data.get('eventID', None),
'event_time': event_data.get('eventTime', None),
'schema_url': event_data.get('schemaURL', None),
'content_type': event_data.get('contentType', None),
'event_data': json.dumps(event_data),
'data': json.dumps(event_data.get('data', None)),
'task_id': self.request.id,
'task_application_name': name,
'task_name': receive_task_name,
'task_status': '202 Accepted',
'exc_type': None,
'exc_value': None,
'exc_traceback': '',
})
with closed_db_context():
inst.save(force_insert=True)
try:
route = router.match_first_or_raise(event_data)
except RouteNotFoundRouterError as exc:
inst.task_status = '422 Unprocessable Entity'
inst.exc_type = 'RouteNotFoundRouterError'
inst.exc_value = str(exc)
with closed_db_context():
inst.save()
else:
inst.task_status = '102 Processing'
with closed_db_context():
inst.save()
try:
route(event_data)
# pylint: disable=broad-except
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
inst.exc_type = exc_type.__name__
inst.exc_value = str(exc_value)
inst.exc_traceback = traceback.format_tb(exc_traceback)
inst.task_status = '500 Internal Server Error'
with closed_db_context():
inst.save()
# pylint: enable=broad-except
else:
inst.task_status = '200 OK'
with closed_db_context():
inst.save()
return celery_app
@classmethod
def create_cherrypy_app(cls, receive_task: celery.Task) -> cherrypy.Application:
"""
Create the CherryPy application root object.
This creates a set of CherryPy objects to be mounted in a
server.
"""
# pylint: disable=too-few-public-methods
class Get:
"""Get class for grabbing info about an event."""
exposed = True
# pylint: disable=invalid-name
@staticmethod
@refresh_database
@cherrypy.tools.json_out()
def GET(task_id: str):
"""Get REST method entrypoint."""
try:
inst = cls.get(task_id=uuid.UUID(task_id))
except peewee.DoesNotExist:
raise cherrypy.HTTPError('404', 'Not Found')
except ValueError:
raise cherrypy.HTTPError('422', 'Unprocessable Entity')
return {
'eventType': inst.event_type,
'eventTypeVersion': inst.event_type_version,
'source': inst.source,
'eventID': inst.event_id,
'eventTime': inst.event_time,
'schemaURL': inst.schema_url,
'contentType': inst.content_type,
'eventData': inst.event_data,
'data': inst.data,
'taskID': str(inst.task_id),
'taskStatus': inst.task_status,
'taskApplicationName': inst.task_application_name,
'taskName': inst.task_name,
'exceptionType': inst.exc_type,
'exceptionValue': inst.exc_value,
'exceptionTraceback': inst.exc_traceback,
'created': str(inst.created) if inst.created is not None else None,
'updated': str(inst.updated) if inst.updated is not None else None,
'deleted': str(inst.deleted) if inst.deleted is not None else None,
}
# pylint: enable=invalid-name
class Receive:
"""Receive entrypoint for new cloud events."""
exposed = True
# pylint: disable=invalid-name
@staticmethod
@refresh_database
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
def POST() -> bytes:
"""Primary REST endpoint for receiving cloud events."""
return receive_task.delay(cherrypy.request.json).id
# pylint: enable=invalid-name
class Status:
"""Status of a specific cloud event."""
exposed = True
# pylint: disable=invalid-name
@staticmethod
@refresh_database
@cherrypy.tools.json_out()
def GET(task_id: str):
"""Get REST entrypoint for specific UUID."""
try:
inst = cls.get(task_id=uuid.UUID(task_id))
except peewee.DoesNotExist:
raise cherrypy.HTTPError('404', 'Not Found')
except ValueError:
raise cherrypy.HTTPError('422', 'Unprocessable Entity')
return inst.task_status
# pylint: enable=invalid-name
class Root:
"""CherryPy root object."""
exposed = True
get = Get()
receive = Receive()
status = Status()
# pylint: disable=invalid-name
@staticmethod
@refresh_database
@cherrypy.tools.json_out()
def GET():
"""Main root get method for some introspection."""
return list(map(lambda inst: {
# 'eventType': inst.event_type,
# 'eventTypeVersion': inst.event_type_version,
# 'source': inst.source,
# 'eventID': inst.event_id,
# 'eventTime': inst.event_time,
# 'schemaURL': inst.schema_url,
# 'contentType': inst.content_type,
# 'eventData': inst.event_data,
# 'data': inst.data,
'taskID': str(inst.task_id),
# 'taskStatus': inst.task_status,
# 'taskApplicationName': inst.task_application_name,
# 'taskName': inst.task_name,
# 'exceptionType': inst.exc_type,
# 'exceptionValue': inst.exc_value,
# 'exceptionTraceback': inst.exc_traceback,
'created': str(inst.created) if inst.created is not None else None,
'updated': str(inst.updated) if inst.updated is not None else None,
'deleted': str(inst.deleted) if inst.deleted is not None else None,
}, cls.select(*[
# cls.event_type,
# cls.event_type_version,
# cls.source,
# cls.event_id,
# cls.event_time,
# cls.schema_url,
# cls.content_type,
# cls.event_data,
# cls.data,
cls.task_id,
# cls.task_application_name,
# cls.task_name,
# cls.task_status,
# cls.exc_type,
# cls.exc_value,
# cls.exc_traceback,
cls.created,
cls.updated,
cls.deleted,
]).order_by(*[
cls.created.desc(),
])))
# pylint: enable=invalid-name
# pylint: enable=too-few-public-methods
def error_page_default(**kwargs: typing.Dict[str, typing.Any]) -> bytes:
"""Error page when something goes wrong."""
cherrypy.response.headers['Content-Type'] = 'application/json'
return bytes(json.dumps(kwargs), 'utf-8')
application = cherrypy.Application(Root(), '/', config={
'/': {
'error_page.default': error_page_default,
'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
},
})
return application
return ReceiveTaskModel
__all__ = ('create_peewee_model', )