shanbay/peeweext

View on GitHub
peeweext/flask.py

Summary

Maintainability
A
0 mins
Test Coverage
from werkzeug.local import LocalProxy
from werkzeug.utils import import_string, cached_property
from playhouse import db_url
from .otel import otel_instrument


class UninitializedException(Exception):
    pass


class Peeweext:
    def __init__(self, ns='PW_'):
        self.ns = ns
        self._database = None
        # make a connection pool proxy
        self.database = LocalProxy(self._get_db)

    def init_app(self, app):
        config = app.config.get_namespace(self.ns)
        self.model_class = import_string(
            config.get('model', 'peeweext.model.Model'))
        conn_params = config.get('conn_params', {})

        otel_instrument(app)
        # initialize private connection pool
        self._database = db_url.connect(config['db_url'], **conn_params)

        self._register_handlers(app)

    def _get_db(self):
        if not self._database:
            raise UninitializedException()
        return self._database

    @cached_property
    def Model(self):
        class BaseModel(self.model_class):
            class Meta:
                database = LocalProxy(self._get_db)

        return BaseModel

    def connect_db(self):
        if self.database.is_closed():
            self.database.connect()

    def close_db(self, exc):
        if not self.database.is_closed():
            self.database.close()

    def _register_handlers(self, app):
        app.before_request(self.connect_db)
        app.teardown_request(self.close_db)
        try:
            from celery.signals import task_prerun, task_postrun
            task_prerun.connect(
                lambda *arg, **kw: self.connect_db(), weak=False)
            task_postrun.connect(
                lambda *arg, **kw: self.close_db(None), weak=False)
        except ImportError:
            pass