DivisionBy-Zero/erpa-sweng

View on GitHub
server/session.py

Summary

Maintainability
A
0 mins
Test Coverage
from contextlib import contextmanager
from os import getenv
from psycopg2 import OperationalError
from psycopg2.pool import ThreadedConnectionPool, SimpleConnectionPool
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import Session, sessionmaker
from typing import Optional

import models

db_engines = dict()  # type: Dict[str, Engine]


class SessionBroker:
    """Generate a session with the backing db."""
    def __init__(self, database_url: Optional[str] = None,
                 engine: Optional[Engine] = None) -> None:
        assert database_url or engine, \
            "Need a database_url or an engine to start a session"

        def memoized_engine(database_url: str):
            if database_url not in db_engines:
                engine = create_engine(database_url)
                engine.pool._use_threadlocal = True
                self.maybe_initialize_tables(engine)
                db_engines[database_url] = engine
            return db_engines[database_url]

        self.engine = (memoized_engine(str(database_url))
                       if engine is None else engine)
        self.session_factory = sessionmaker(bind=self.engine)

    @staticmethod
    def maybe_initialize_tables(engine: Engine) -> None:
        models.Base.metadata.create_all(engine)

    @contextmanager
    def get_session(self) -> Session:
        session = self.session_factory()
        yield session
        session.flush()  # Flush entities before expunging (converge instances)
        session.expunge_all()
        if session.is_active:
            session.commit()
        session.close()