smok-serwis/smok-client

View on GitHub
smok/threads/communicator.py

Summary

Maintainability
B
4 hrs
Test Coverage
F
38%
import logging
import queue
import time
import typing as tp

import ujson
from satella.coding import silence_excs, for_argument, log_exceptions
from satella.coding.concurrent import TerminableThread, Condition
from satella.coding.decorators import retry
from satella.coding.transforms import jsonify
from satella.exceptions import WouldWaitMore
from satella.time import measure

from smok.basics import StorageLevel
from smok.exceptions import ResponseError

from smok.extras import BasePathpointDatabase
from smok.pathpoint.orders import sections_from_list
from smok.pathpoint.pathpoint import Pathpoint
from smok.predicate import DisabledTime
from smok.predicate.undefined import UndefinedStatistic
from smok.sensor import Sensor
from smok.sync_workers.base import SyncError

logger = logging.getLogger(__name__)

SENSORS_SYNC_INTERVAL = 300


@for_argument(returns=jsonify)
def pathpoints_to_json(pps: tp.Iterable[Pathpoint]) -> list:
    output = []
    for pp in pps:
        output.append({'path': pp.name,
                       'storage_level': pp.storage_level})
    return output


COMMUNICATOR_INTERVAL = 60
PREDICATE_SYNC_INTERVAL = 300
BAOB_SYNC_INTERVAL = 60 * 60  # an hour


def redo_data(data):
    """
    Alter the data received from the backend to our way
    """
    output = []
    for pp in data:
        values = []
        for ts in pp['values']:
            if isinstance(ts, dict):
                if 'error_code' in ts:
                    values.append([False, ts['timestamp'], ts['error_code']])
                else:
                    values.append([ts['timestamp'], ts['value']])
            else:
                values.append(ts)
        output.append({'path': pp['path'],
                       'values': values})
    return output


class CommunicatorThread(TerminableThread):
    """
    Note that this will sleep only if no data was synchronized during current loop execution.
    """
    def __init__(self, device: 'SMOKClient', order_queue: queue.Queue,
                 data_to_sync: BasePathpointDatabase, dont_obtain_orders: bool,
                 dont_do_baobs: bool, dont_do_pathpoints: bool,
                 dont_do_predicates: bool,
                 dont_sync_sensor_writes: bool,
                 startup_delay: float):
        super().__init__(name='order getter')
        self.dont_sync_sensor_writes = dont_sync_sensor_writes
        self.device = device
        self.startup_delay = startup_delay
        self.dont_do_pathpoints = dont_do_pathpoints
        self.dont_do_baobs = dont_do_baobs
        self.dont_obtain_orders = dont_obtain_orders
        self.queue = order_queue
        self.dont_do_predicates = dont_do_predicates
        self.data_to_sync = data_to_sync
        self.last_sensors_synced = 0
        self.last_predicates_synced = 0
        self.data_to_update = Condition()
        self.last_baob_synced = 0

    def tick_predicates(self) -> None:
        for predicate in self.device.predicates.values():
            kwargs = predicate.to_kwargs()
            # noinspection PyProtectedMember
            predicate._call_method('on_tick')
            new_kwargs = predicate.to_kwargs()
            if kwargs != new_kwargs:
                self.device.pred_database.update_predicate(new_kwargs)

    @retry(3, ResponseError)
    def sync_events(self) -> None:
        evt_to_sync = self.device.evt_database.get_events_to_sync()
        if evt_to_sync is None:
            return
        try:
            json = [evt.to_json() for evt in evt_to_sync.get_events()]
            resp = self.device.api.post('/v1/device/alarms',
                                        json=json)
            evt_to_sync.acknowledge(*(data['uuid'] for data in resp))
            self.device.on_successful_sync()
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            logger.error('Failure syncing events: %s', e, exc_info=e)
            evt_to_sync.negative_acknowledge()
            if e.is_clients_fault():
                # we failed this sync, but we still need to mark it as correct data
                return
            raise

    @retry(3, ResponseError)
    def sync_predicates(self) -> None:
        try:
            resp = self.device.api.get('/v1/device/predicates')
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            raise

        predicates_found = set()
        for predicate_dict in resp:
            predicate_id = predicate_dict['predicate_id']
            predicates_found.add(predicate_id)
            if not predicate_dict['online']:
                if predicate_id in self.device.predicates:
                    self.device.predicates[predicate_id].on_offline()
                    del self.device.predicates[predicate_id]
            else:
                silencing = [DisabledTime.from_json(dct) for dct in
                             predicate_dict.get('silencing', ())]

                if predicate_id not in self.device.predicates:
                    stat_name = predicate_dict['statistic']
                    cfg = predicate_dict['configuration']
                    base_class = self.device.statistic_registration.try_match(stat_name, cfg)
                    if base_class is None:
                        base_class = UndefinedStatistic
                    predicate = base_class(device=self.device,
                                           predicate_id=predicate_id,
                                           verbose_name=predicate_dict['verbose_name'],
                                           silencing=silencing,
                                           configuration=cfg,
                                           statistic=stat_name)
                    self.device.predicates[predicate_id] = predicate
                else:
                    stat = self.device.predicates[predicate_id]
                    config = predicate_dict['configuration']
                    verbose_name = predicate_dict['verbose_name']
                    group = predicate_dict['group']
                    if stat.configuration != config:
                        stat.on_configuration_changed(config)
                    if stat.silencing != silencing:
                        stat.on_silencing_changed(silencing)
                    if stat.verbose_name != verbose_name:
                        stat.on_verbose_name_changed(verbose_name)
                    if stat.group != group:
                        stat.on_group_changed(group)

                predicate_dict['silencing'] = silencing

        predicates_to_delete = set(self.device.predicates.keys()) - predicates_found
        for predicate_id in predicates_to_delete:
            self.device.predicates[predicate_id].on_offline()
            self.device.evt_database.on_predicate_deleted(predicate_id)
            del self.device.predicates[predicate_id]

        if not self.last_predicates_synced:
            self.device.ready_lock.release()
        self.last_predicates_synced = time.monotonic()
        self.db_sync_predicates()
        self.device.on_successful_sync()

    def db_sync_predicates(self):
        lst = [predicate.to_kwargs() for predicate in self.device.predicates.values()]
        self.device.pred_database.set_new_predicates(lst)

    @retry(3, ResponseError)
    def sync_sensors(self) -> None:
        try:
            resp = self.device.api.get('/v1/device/sensors')

            self.device.sensor_database.on_sensors_sync(
                [Sensor.from_json(self.device, data) for data in resp])
            self.last_sensors_synced = time.monotonic()
            self.device.on_successful_sync()
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            raise

    @retry(3, ResponseError)
    def fetch_orders(self) -> None:
        try:
            resp = self.device.api.post('/v1/device/orders')
            if resp:
                for section in sections_from_list(resp):
                    self.queue.put(section)
            self.device.on_successful_sync()
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            raise

    @retry(3, ResponseError)
    def sync_sensor_writes(self) -> None:
        sync = self.device.sensor_write_database.get_sw_sync()
        if not sync:
            return
        try:
            self.device.api.put('/v1/device/sensor/write_log', json=sync.to_json())
            sync.ack()
            self.device.on_successful_sync()
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            if not e.is_clients_fault():
                logger.warning('Failed to sync sensor writes: %s', e, exc_info=e)
                sync.nack()
                raise
            else:
                logger.warning('Got HTTP %s on sync sensor writes, acking', e.status_code)
                sync.ack()

    def sync_data(self) -> bool:
        """
        :return: whether we should resynchronize right away
        """
        sync = self.data_to_sync.get_data_to_sync()
        if sync is None:
            return False
        try:
            data = sync.to_json()
            if not data:
                sync.acknowledge()
                return False
            self.device.sync_worker.sync_pathpoints(redo_data(data))
            sync.acknowledge()
            self.device.on_successful_sync()
        except SyncError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            if not e.is_clients_fault():
                sync.negative_acknowledge()
            else:
                logger.warning('Got HTTP %s while syncing pathpoint data. '
                               'Assuming is it damaged and marking as synced', e.status_code)
                sync.acknowledge()
        return True

    def sync_baob(self) -> None:
        self._sync_baob()
        self.device.baobs_loaded = True

    @retry(3, ResponseError)
    def _sync_baob(self) -> None:
        try:
            keys = self.device.baob_database.get_all_keys()
            data = []
            for key in keys:
                try:
                    data.append({'key': key,
                                 'version': self.device.baob_database.get_baob_version(key)})
                except KeyError:
                    logger.error('Got key %s but the DB tells us that it does not exist', key)
                    continue
            data = self.device.api.post('/v1/device/baobs', json=data)

            for key_to_delete in data['should_delete']:
                self.device.baob_database.delete_baob(key_to_delete)

            for key_to_download in data['should_download']:
                resp, headers = self.device.api.get('/v1/device/baobs/%s' % (key_to_download, ),
                                                    direct_response=True)
                self.device.baob_database.set_baob_value(key_to_download, resp,
                                                         int(headers['X-SMOK-BAOB-Version']))
                if self.last_baob_synced:
                    self.device.on_baob_updated(key_to_download)

            for key_to_upload in data['should_upload']:
                self.device.api.put('/v1/device/baobs/%s' % (key_to_upload, ), files={
                    'file': self.device.baob_database.get_baob_value(key_to_upload),
                    'data': ujson.dumps(
                        {'version': self.device.baob_database.get_baob_version(
                            key_to_upload)}).encode(
                        'utf8')
                })
            self.last_baob_synced = time.monotonic()
            self.device.on_successful_sync()
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            if e.is_clients_fault():
                return  # no reason to repeat requests that failed because of us
            raise

    @retry(3, ResponseError)
    def sync_pathpoints(self) -> None:
        pps = self.device.pathpoints.copy_and_clear_dirty()
        data = pathpoints_to_json(pps.values())
        try:
            resp = self.device.api.put('/v1/device/pathpoints', json=data)
            for pp in resp:
                name = pp['path']
                if name.startswith('r'):  # Don't use reparse pathpoints
                    continue
                with silence_excs(KeyError):
                    pathpoint = self.device.provide_unknown_pathpoint(name, StorageLevel(
                        pp.get('storage_level', 1)))
                    stor_level = StorageLevel(pp.get('storage_level', 1))
                    if stor_level != pathpoint.storage_level:
                        pathpoint.on_new_storage_level(stor_level)
            self.device.on_successful_sync()
        except ResponseError as e:
            if e.is_no_link():
                self.device.on_failed_sync()
            self.device.pathpoints.update(pps)
            self.device.pathpoints.dirty = True
            if e.is_clients_fault():
                # this part of data was damaged in that the server has rejected it
                # but we still need to mark it as successful in order no to retry that sync
                return
            raise

    def prepare(self) -> None:
        # Give the app a moment to prepare and define it's pathpoints
        self.safe_sleep(self.startup_delay)

    def wait(self, time_taken: float):
        time_to_wait = COMMUNICATOR_INTERVAL - time_taken
        while time_to_wait > 0.1 and not self.terminating:  # for float roundings
            try:
                ttw = min(time_to_wait, 5)
                self.data_to_update.wait(timeout=ttw)
                return
            except WouldWaitMore:
                time_to_wait -= ttw

    @log_exceptions(logger, logging.ERROR)
    def loop(self) -> None:
        should_wait = True
        with measure() as measurement:
            # Synchronize the data
            monotime = time.monotonic()

            if self.device.allow_sync:
                if not self.dont_do_pathpoints:
                    if self.sync_data():
                        should_wait = False

                    # Synchronize the pathpoints
                    if self.device.pathpoints.dirty:
                        self.sync_pathpoints()

                    # Synchronize sensors
                    if monotime - self.last_sensors_synced > SENSORS_SYNC_INTERVAL:
                        self.sync_sensors()

                # Synchronize predicates
                if not self.dont_do_predicates:
                    if monotime - self.last_predicates_synced > PREDICATE_SYNC_INTERVAL:
                        self.sync_predicates()

                # Fetch the BAOBs
                if not self.dont_do_baobs:
                    if monotime - self.last_baob_synced > BAOB_SYNC_INTERVAL:
                        self.sync_baob()
                if not self.dont_sync_sensor_writes:
                    self.sync_sensor_writes()

                # Fetch the orders
                if not self.dont_obtain_orders and not self.device.sync_worker.has_async_orders:
                    self.fetch_orders()

                if not self.dont_do_predicates:
                    # Tick the predicates
                    self.tick_predicates()

                    # Sync the events
                    self.sync_events()

                # Checkpoint the DB
                if not self.dont_do_pathpoints:
                    self.device.pp_database.checkpoint()
                if not self.dont_do_predicates:
                    self.device.evt_database.checkpoint()

            # Wait for variables to refresh, do we need to upload any?
            if should_wait:
                self.wait(measurement())