wglass/zoonado

View on GitHub
zoonado/recipes/double_barrier.py

Summary

Maintainability
A
55 mins
Test Coverage
from __future__ import unicode_literals
 
import logging
import time
 
from tornado import gen
 
from zoonado import exc, WatchEvent
 
from .sequential import SequentialRecipe
 
log = logging.getLogger(__name__)
 
 
class DoubleBarrier(SequentialRecipe):
 
def __init__(self, base_path, min_participants):
super(DoubleBarrier, self).__init__(base_path)
self.min_participants = min_participants
 
@property
def sentinel_path(self):
return self.sibling_path("sentinel")
 
@gen.coroutine
def enter(self, timeout=None):
log.debug("Entering double barrier %s", self.base_path)
time_limit = None
if timeout is not None:
time_limit = time.time() + timeout
 
barrier_lifted = self.client.wait_for_event(
WatchEvent.CREATED, self.sentinel_path
)
if time_limit:
barrier_lifted = gen.with_timeout(time_limit, barrier_lifted)
 
exists = yield self.client.exists(path=self.sentinel_path, watch=True)
 
yield self.create_unique_znode("worker")
 
_, participants = yield self.analyze_siblings()
 
if exists:
return
 
elif len(participants) >= self.min_participants:
yield self.create_znode(self.sentinel_path)
return
 
try:
yield barrier_lifted
except gen.TimeoutError:
raise exc.TimeoutError
 
@gen.coroutine
Function `leave` has a Cognitive Complexity of 9 (exceeds 5 allowed). Consider refactoring.
def leave(self, timeout=None):
log.debug("Leaving double barrier %s", self.base_path)
time_limit = None
if timeout is not None:
time_limit = time.time() + timeout
 
owned_positions, participants = yield self.analyze_siblings()
while len(participants) > 1:
if owned_positions["worker"] == 0:
yield self.wait_on_sibling(participants[-1], time_limit)
else:
yield self.delete_unique_znode("worker")
yield self.wait_on_sibling(participants[0], time_limit)
 
owned_positions, participants = yield self.analyze_siblings()
 
if len(participants) == 1 and "worker" in owned_positions:
yield self.delete_unique_znode("worker")
try:
yield self.client.delete(self.sentinel_path)
except exc.NoNode:
pass