webservices/common/models/base.py

Summary

Maintainability
A
0 mins
Test Coverage
import random

import celery
from flask_sqlalchemy import SQLAlchemy
from flask_sqlalchemy import SignallingSession


class RoutingSession(SignallingSession):
    """Route requests to database leader or follower as appropriate.

    Based on http://techspot.zzzeek.org/2012/01/11/django-style-database-routers-in-sqlalchemy/
    """

    @property
    def followers(self):
        return self.app.config['SQLALCHEMY_FOLLOWERS']

    @property
    def follower_tasks(self):
        return self.app.config['SQLALCHEMY_FOLLOWER_TASKS']

    @property
    def restrict_follower_traffic_to_tasks(self):
        return self.app.config['SQLALCHEMY_RESTRICT_FOLLOWER_TRAFFIC_TO_TASKS']

    @property
    def use_follower(self):
        # Check for read operations and configured followers.
        use_follower = (
            not self._flushing and
            len(self.followers) > 0
        )

        # Optionally restrict traffic to followers for only supported tasks.
        if use_follower and self.restrict_follower_traffic_to_tasks:
            use_follower = (
                celery.current_task and
                celery.current_task.name in self.follower_tasks
            )

        return use_follower

    def get_bind(self, mapper=None, clause=None):
        if self.use_follower:
            return random.choice(self.followers)

        return super().get_bind(mapper=mapper, clause=clause)


class RoutingSQLAlchemy(SQLAlchemy):

    def create_session(self, options):
        return RoutingSession(self, **options)


db = RoutingSQLAlchemy()


class BaseModel(db.Model):
    __abstract__ = True
    idx = db.Column(db.Integer, primary_key=True)