ARMmaster17/watergrid-python

View on GitHub
watergrid/pipelines/ha_pipeline.py

Summary

Maintainability
A
0 mins
Test Coverage
A
92%
import logging
import time
 
from watergrid.locks.PipelineLock import PipelineLock
from watergrid.pipelines.pipeline import Pipeline
 
 
class HAPipeline(Pipeline):
"""
Line too long (113 > 79 characters)
The high availability pipeline allows for several machines to have the pipeline loaded at once. If one of the
Line too long (99 > 79 characters)
machines fails, or if the pipeline context times out, the pipeline will run on another machine.
 
:param pipeline_name: The name of the pipeline.
:param pipeline_lock: The lock implementation to use for this pipeline.
"""
 
def __init__(self, pipeline_name: str, pipeline_lock: PipelineLock):
super().__init__(pipeline_name)
self.__pipeline_lock = pipeline_lock
self.__lock_timings = []
 
def run(self):
if self.__pipeline_lock.lock():
super().run()
self.__pipeline_lock.unlock()
else:
logging.debug(
"pipeline {} is already running on another instance".format(
self.get_pipeline_name()
)
)
 
def run_interval(self, job_interval_s: int) -> None:
"""
Line too long (105 > 79 characters)
Runs the pipeline in a blocking interval mode. Every job_interval_s seconds, the pipeline is run.
:param job_interval_s: Number of seconds to wait between pipeline runs.
:return: None
"""
while True:
self._run_interval_loop(job_interval_s)
time.sleep(self._calculate_delay(job_interval_s) / 1000)
 
def _run_interval_loop(self, job_interval_s: int) -> None:
"""
Line too long (114 > 79 characters)
Runs the pipeline in interval mode once. If the lock is successfully acquired, the pipeline steps are run.
:param job_interval_s: Number of seconds to wait between pipeline runs.
:return: None
"""
if self.__pipeline_lock.lock():
self._perform_locked_interval_actions(job_interval_s)
self.__pipeline_lock.unlock()
else:
self._handle_lock_acquire_failure()
 
def _perform_locked_interval_actions(self, job_interval_s: int) -> None:
"""
Line too long (117 > 79 characters)
Performs the actions that should be performed when the pipeline is locked. Note that this method assumes that
Line too long (85 > 79 characters)
the lock has already been acquired and is held by the calling class/function.
:param job_interval_s: Number of seconds to wait between pipeline runs.
:return: None
"""
self._verify_lock_metadata(job_interval_s)
last_run = self._get_last_run()
if time.time() - last_run > job_interval_s:
super().run()
self._set_last_run(last_run + job_interval_s)
 
def _verify_lock_metadata(self, job_interval_s: int) -> None:
"""
Line too long (118 > 79 characters)
Verifies that the lock metadata is correct. If the metadata is not correct, the lock is deleted and recreated.
:param job_interval_s: Number of seconds to wait between pipeline runs.
:return: None
"""
last_run = self._get_last_run()
if last_run == 0:
self._set_last_run(time.time() - job_interval_s)
if time.time() - last_run > job_interval_s * 3:
logging.warning(
Line too long (112 > 79 characters)
"pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or "
"provisioning more machines.".format(self.get_pipeline_name())
)
self._set_last_run(time.time() - job_interval_s)
 
def _handle_lock_acquire_failure(self) -> None:
"""
Line too long (117 > 79 characters)
Handles the case where the pipeline lock could not be acquired. This is a no-op for the HA pipeline. Does not
run if the lock could not be acquired due to an exception.
:return: None
"""
logging.debug(
"pipeline {} is already running on another instance".format(
self.get_pipeline_name()
)
)
 
def _set_last_run(self, last_run: float) -> None:
"""
Line too long (92 > 79 characters)
Sets the timestamp of the last run of this pipeline as recorded in the lock backend.
:param last_run: Timestamp to record as the last run of this pipeline.
:return: None
"""
Line too long (80 > 79 characters)
self.__pipeline_lock.write_key(self._get_pipeline_lock_name(), last_run)
 
def _get_last_run(self) -> float:
"""
Line too long (113 > 79 characters)
Gets the timestamp of the last run of this pipeline as recorded in the lock backend. Note that this value
Line too long (107 > 79 characters)
keeps the interval defined by job_interval_s, and may not represent the exact time of the last run.
:return: Timestamp of the last run of this pipeline.
"""
try:
Line too long (87 > 79 characters)
return float(self.__pipeline_lock.read_key(self._get_pipeline_lock_name()))
except TypeError:
return 0
 
def _get_pipeline_lock_name(self) -> str:
"""
Builds the name of the lock assigned to this pipeline.
:return: pipeline lock name.
"""
return "{}_last_run".format(self.get_pipeline_name())
 
def lock_with_timing(self) -> None:
"""
Line too long (110 > 79 characters)
Acquire the pipeline lock and record the time it took to acquire the lock to the lock timings counter.
:return: None
"""
start_time = time.perf_counter()
self.__pipeline_lock.lock()
self._append_timing(time.perf_counter() - start_time)
 
def unlock_with_timing(self) -> None:
"""
Line too long (94 > 79 characters)
Unlocks the pipeline and appends the time taken to unlock to the lock timings counter.
:return: None
"""
start_time = time.perf_counter()
self.__pipeline_lock.unlock()
self._append_timing(time.perf_counter() - start_time)
 
def get_average_lock_delay(self) -> float:
"""
Line too long (113 > 79 characters)
Returns the average time it takes to perform a lock operation with the currently configured backend lock.
:return: Average operation time in milliseconds.
"""
if len(self.__lock_timings) == 0:
return 0
Line too long (80 > 79 characters)
return float(sum(self.__lock_timings)) / float(len(self.__lock_timings))
 
def _calculate_delay(
self,
pipeline_interval_s: int,
checks_per_interval: int = 10,
check_ratio: int = 3,
) -> int:
"""
Line too long (119 > 79 characters)
Calculates the delay in milliseconds to wait before running the pipeline again. This is based on the configured
Line too long (96 > 79 characters)
pipeline interval, and the average performance of the backend hosting the pipeline lock.
Line too long (82 > 79 characters)
:param pipeline_interval_s: The interval in seconds between pipeline runs.
Line too long (109 > 79 characters)
:param checks_per_interval: Number of times the lock should be chedked per interval across all nodes.
Line too long (93 > 79 characters)
:param check_ratio: Should be a number close to the average number of pipeline nodes.
Line too long (81 > 79 characters)
:return: Delay in milliseconds to wait before running the pipeline again.
"""
redis_delay_ms = self.get_average_lock_delay()
Line too long (85 > 79 characters)
job_delay_ms = float(pipeline_interval_s * 1000) / float(checks_per_interval)
if redis_delay_ms > job_delay_ms:
logging.warning(
Line too long (92 > 79 characters)
"Slow redis cluster detected. Consider increasing the size of your cluster."
)
return int(max(redis_delay_ms, job_delay_ms)) * check_ratio
 
def _append_timing(self, timing: float) -> None:
"""
Line too long (120 > 79 characters)
Adds a lock operation benchmark timing to the rolling average counter. Used for calculating mutex lookup delays.
Line too long (86 > 79 characters)
:param timing: The time in milliseconds it took to perform the lock operation.
:return: None
"""
self.__lock_timings.append(timing)
if len(self.__lock_timings) > 100:
self.__lock_timings.pop(0)