wglass/zoonado

View on GitHub
zoonado/recipes/base_lock.py

Summary

Maintainability
A
1 hr
Test Coverage
from __future__ import unicode_literals

import contextlib
import logging
import time

from tornado import gen, ioloop

from zoonado import exc, states

from .sequential import SequentialRecipe


log = logging.getLogger(__name__)


class BaseLock(SequentialRecipe):

    @gen.coroutine
    def wait_in_line(self, znode_label, timeout=None, blocked_by=None):
        time_limit = None
        if timeout is not None:
            time_limit = time.time() + timeout

        yield self.create_unique_znode(znode_label)

        while True:
            if time_limit and time.time() >= time_limit:
                raise exc.TimeoutError

            owned_positions, contenders = yield self.analyze_siblings()
            if znode_label not in owned_positions:
                raise exc.SessionLost

            blockers = contenders[:owned_positions[znode_label]]
            if blocked_by:
                blockers = [
                    contender for contender in blockers
                    if self.determine_znode_label(contender) in blocked_by
                ]

            if not blockers:
                break

            yield self.wait_on_sibling(blockers[-1], time_limit)

        raise gen.Return(self.make_contextmanager(znode_label))

    def make_contextmanager(self, znode_label):
        state = {"acquired": True}

        def still_acquired():
            return state["acquired"]

        @gen.coroutine
        def handle_session_loss():
            yield self.client.session.state.wait_for(states.States.LOST)
            if not state["acquired"]:
                return

            log.warn(
                "Session expired at some point, lock %s no longer acquired.",
                self
            )
            state["acquired"] = False

        ioloop.IOLoop.current().add_callback(handle_session_loss)

        @gen.coroutine
        def on_exit():
            state["acquired"] = False
            yield self.delete_unique_znode(znode_label)

        @contextlib.contextmanager
        def context_manager():
            try:
                yield still_acquired
            finally:
                ioloop.IOLoop.current().add_callback(on_exit)

        return context_manager()


class LockLostError(exc.ZKError):
    pass