smok/client/client.py
import datetime
import io
import logging
import os
import tempfile
import threading
import time
import typing as tp
import uuid
import warnings
import weakref
from abc import ABCMeta
import pytz
from satella.coding import Closeable, for_argument, wraps
from satella.coding.concurrent import PeekableQueue, Condition
from satella.coding.optionals import Optional
from satella.coding.structures import DirtyDict
from satella.time import time_as_int
from smok.sync_workers.ngtt import NGTTSyncWorker
from smok.sync_workers.http import HTTPSyncWorker
from .api import RequestsAPI
from .certificate import get_device_info, get_root_cert, get_dev_ca_cert
from .slave import SlaveDevice
from ..baob import BAOB
from ..basics import DeviceInfo, Environment, StorageLevel
from ..exceptions import ResponseError, UnavailableError
from ..extras import BaseSensorDatabase, BaseEventDatabase, BaseMacroDatabase, \
BasePathpointDatabase, BaseMetadataDatabase, BaseBAOBDatabase, BaseArchivesDatabase, \
BaseSensorWriteDatabase, BasePredicateDatabase
from ..extras.arch_database.in_memory import InMemoryArchivesDatabase
from ..extras.baob_database.memory import InMemoryBAOBDatabase
from ..extras.event_database import InMemoryEventDatabase
from ..extras.macros_database.in_memory import InMemoryMacroDatabase
from ..extras.metadata_database import InMemoryMetadataDatabase
from ..extras.pp_database.in_memory import InMemoryPathpointDatabase
from ..extras.pred_database.in_memory import InMemoryPredicateDatabase
from ..extras.sensor_write_database.in_memory import InMemorySensorWriteDatabase
from ..extras.sensors_database.in_memory import InMemorySensorDatabase
from ..metadata import PlainMetadata
from ..pathpoint import Pathpoint, ReparsePathpoint
from ..pathpoint.orders import Section, MessageOrder
from ..predicate import BaseStatistic, Event, Color
from ..predicate.registration import CollectionOfStatistics, StatisticRegistration
from ..predicate.undefined import UndefinedStatistic
from ..sensor import Sensor, fqtsify, SensorWriteEvent
from ..threads import OrderExecutorThread, CommunicatorThread, ArchivingAndMacroThread, \
LogPublisherThread
from ..threads.communicator import PREDICATE_SYNC_INTERVAL
logger = logging.getLogger(__name__)
def must_be_open(fun):
@wraps(fun)
def inner(self, *args, **kwargs):
if self.closed:
raise RuntimeError('SMOK device already closed!')
return fun(self, *args, **kwargs)
return inner
class SMOKDevice(Closeable, metaclass=ABCMeta):
"""
A base class for a SMOK device.
You should subclass it, to provide your own device.
Read carefully. If the documentation states that a particular thread invokes this method, this means
that you are essentially not allowed to call the method itself. It will be called by respective processor thread.
When in doubt, use the source or just call super() while overloading.
Note that instantiating this object spawns two non-daemon threads. This object must be
close()d before termination (or garbage collected).
:param cert: either a path to or a file-like object containing the device certificate
:param priv_key: either a path to or a file-like object containing the device private key
:param allow_sync: whether to allow outbound TCP communication to automatically synchronize
background. Note that if you explicitly call a method that calls TCP this does not apply.
Can be lated changed.
:param evt_database: custom event database. Providing a string defaults to path where predicate
data will be persisted. Since events are persistent, a store needs to be given.
:param pp_database: custom pathpoint value database. Default value of None defaults to an
in-memory implementation
:param macro_database: custom macro database. Default value of None will result in an
in-memory implementation
:param meta_database: custom meta database. Default value of None will result in an
in-memory implementation
:param sensor_database: custom sensor database. Default value of None will result in an
in-memory implementation
:param baob_database: custom BAOB database. Default value of None will result in an
in-memory implementation
:param pred_database: custom Predicate database. Default value of None will result in an
in-memory implementation
:param arch_database: custom archives database. Default value of None will result in an
in-memory implementation
:param sensor_write_database: custom sensor write database. Default value of None will result in
an in-memory implementation
:param dont_obtain_orders: if set to True, this SMOKDevice won't poll for orders.
This also implies dont_do_baobs. It is a ValueError to set this while setting
dont_bo_baobs to False.
:param dont_do_pathpoints: if set to True, this SMOKDevice won't support pathpoints
or sensors. By won't support I mean it won't try to synchronize the data,
but pathpoints and sensors will still be available. Note that providing a source
of pathpoints in the form of a database is required, and it is a ValueError
to try to give this to True without giving a sensor_database.
:param dont_sync_sensor_writes: if set to True, sensor writes won't be synced
:param dont_do_baobs: if set to True, this SMOKDevice won't care about BAOBs.
:param dont_do_macros: if set to True, this SMOKDevice won't take care of the macros
:param dont_do_predicates: if set to True, this SMOKDevice won't do predicates
:param dont_do_archives: if set to True, this SMOKDevice won't do archiving
:param startup_delay: amount of seconds to wait after creation for CommunicatorThread to
start talking and OrderExecutorThread to start grabbing orders. Deprecated.
:param cache_metadata_for: amount of seconds to cache downloaded metadata entry.
Ie no attempt to download them from the server again will be made in that many
seconds since the download.
:param delayed_boot: if set to True, you will need to call
:meth:`~smok.client.SMOKDevice.continue_boot` in order to start fetching orders and the
like. False by default.
:param use_ngtt: if set to True, orders will be fetched asynchronously over
a persistent TLS connection instead of HTTP API. This is both more efficient and bandwidth-saving instead
of polling.
About 10 seconds from creation if CommunicatorThread was created, sensors will be synced and
the device will start talking. To reduce this delay, set parameter startup_delay
If both dont_do_macros and dont_do_archives are True, the archiving & macro thread
won't be started.
If dont_obtain_orders is True, then order executor and order getter threads won't be started.
Every method is supposed to be called from user threads (SMOKDevice's service threads included),
unless noted otherwise.
:ivar device_id: device ID of this device (str)
:ivar allow_sync: whether to allow background synchronization (bool).
This can be changed by setting it directly to correct value. Setting this to False
will help you save some bandwidth. Note that log upload will be put on hold, until it
overflows the buffer. In that case, earlier submitted logs will be thrown on the floor
to make room for new entries.
:ivar pathpoints: a dictionary, keying pathpoint names to their instances
:ivar url: base URL for the API calls, without the trailing slash
:ivar metadata: plain metadata for this device
(class :class:`smok.metadata.PlainMetadata`)
:ivar baobs_loaded: whether all BAOBS have been synchronized (bool)
:ivar cert_data: (bytes) device certificate as given by the user
:raise ValueError: invalid combination of arguments was given
"""
def provide_unknown_pathpoint(self, name: str,
storage_level: StorageLevel = StorageLevel.TREND) -> Pathpoint:
"""
Override this method to generate pathpoints that are referred to by incoming commands,
but not defined yet.
The default implementation always raises `KeyError`.
.. note:: this can safely raise `KeyError` upon encountering a predicate that is manually
defined and registered via :class:`~smokclient.pathpoint.Pathpoint`
This will be called by an assortment of threads, as well as you.
:return: a Pathpoint instance corresponding to what was ordered
:raises KeyError: pathpoint could not be generated
"""
return Pathpoint(self, name, storage_level)
def sync_sections(self, termination_checker: tp.Callable[[], bool]):
"""
Called by order executor thread before a CANNOT_JOIN section is about to be executed.
Override this method if you are providing a custom
:meth:`~smok.client.SMOKDevice.execute_section`. Default does nothing.
This needs to block until all orders issued up to this point are finished.
Called by the order executor thread, that's why it can safely block.
Sections should be synchronized in bounded time.
:param termination_checker: a callable that can be called to determine whether given
SMOKDevice is undergoing a shutdown. If this callable starts to return True,
the function should return at once.
"""
def execute_section(self, section: Section) -> None:
"""
Override to implement custom section execution.
If this is not overridden, standard executor logic will be applied.
This will not provide you with cancelled sections. You also do not need to mark
sections as complete, as this is done by the executor thread.
Called by the order executor thread. If this is defined, then
:meth:`~smok.client.SMOKDevice.sync_sections` is necessary as well, so this should just
execute the orders themselves.
:param section: section to execute
"""
def __init__(self, cert: tp.Union[str, io.StringIO],
priv_key: tp.Union[str, io.StringIO],
evt_database: tp.Union[str, BaseEventDatabase],
pp_database: tp.Optional[BasePathpointDatabase] = None,
macro_database: tp.Optional[BaseMacroDatabase] = None,
meta_database: tp.Optional[BaseMetadataDatabase] = None,
sensor_database: tp.Optional[BaseSensorDatabase] = None,
baob_database: tp.Optional[BaseBAOBDatabase] = None,
pred_database: tp.Optional[BasePredicateDatabase] = None,
arch_database: tp.Optional[BaseArchivesDatabase] = None,
sensor_write_database: tp.Optional[BaseSensorWriteDatabase] = None,
allow_sync: bool = True,
dont_obtain_orders: bool = False,
dont_sync_sensor_writes: bool = False,
dont_do_macros: bool = False,
dont_do_predicates: bool = False,
dont_do_pathpoints: bool = False,
dont_do_baobs: bool = False,
dont_do_archives: bool = False,
cache_metadata_for: float = 60,
startup_delay: tp.Optional[float] = None,
delayed_boot: bool = False,
use_ngtt: bool = False):
if startup_delay is not None:
warnings.warn('This is deprecated. Use delayed_boot', DeprecationWarning)
else:
startup_delay = 0
super().__init__()
Optional(baob_database).check_consistency()
self.cache_metadata_for = cache_metadata_for
if dont_do_pathpoints and sensor_database is None:
raise ValueError('Provide a sensor database if you dont provide pathpoints!')
self.dont_do_predicates = dont_do_predicates
self.dont_do_pathpoints = dont_do_pathpoints
self.dont_sync_sensor_writes = dont_sync_sensor_writes
self.pp_database = pp_database or InMemoryPathpointDatabase()
self.baobs_loaded = False
self.allow_sync = allow_sync
if isinstance(evt_database, str):
self.evt_database = InMemoryEventDatabase(evt_database)
else:
self.evt_database = evt_database
self.macros_database = macro_database or InMemoryMacroDatabase()
self.meta_database = meta_database or InMemoryMetadataDatabase()
self.pred_database = pred_database or InMemoryPredicateDatabase()
self.delayed_boot = delayed_boot
self.sensor_database = sensor_database or InMemorySensorDatabase()
self.sensor_database.on_register(self)
self.use_ngtt = use_ngtt
self.arch_database = arch_database or InMemoryArchivesDatabase()
self.baob_database = baob_database or InMemoryBAOBDatabase()
self.sensor_write_database = sensor_write_database or InMemorySensorWriteDatabase()
self.metadata = PlainMetadata(self)
self.ready_lock = threading.Lock()
self.ready_lock.acquire()
self.predicates = {} # type: tp.Dict[str, BaseStatistic]
# Load cached predicates
for predicate in self.pred_database.get_all_predicates():
udf = UndefinedStatistic(device=self, **predicate)
self.predicates[udf.predicate_id] = udf
self._timezone = None
self.statistic_registration = CollectionOfStatistics()
self._statistics_updated = False
self.pathpoints = DirtyDict() # type: tp.Dict[str, Pathpoint]
self.temp_file_for_cert = None
self.cert_file_name = None
if not isinstance(cert, str):
with tempfile.NamedTemporaryFile('w', delete=False) as cert_file:
cert_file.write(cert.read())
cert = self.temp_file_for_cert = cert_file.name
else:
self.cert_file_name = self.temp_file_for_cert = cert
self.priv_key_file_name = None
self.temp_file_for_key = None
if not isinstance(priv_key, str):
with tempfile.NamedTemporaryFile('w', delete=False) as key_file:
key_file.write(priv_key.read())
priv_key = self.temp_file_for_key = key_file.name
else:
self.priv_key_file_name = self.temp_file_for_key = priv_key
self.cert = cert, priv_key
with open(cert, 'rb') as fin:
cert_data = fin.read()
self.cert_data = cert_data
dev_id, env = get_device_info(cert_data)
self.device_id = dev_id # type: str
self.environment = env # type: Environment
if self.environment == Environment.PRODUCTION:
self.url = 'https://api.smok.co'
elif self.environment == Environment.STAGING:
self.url = 'https://api.test.smok-serwis.pl'
elif self.environment == Environment.LOCAL_DEVELOPMENT:
self.url = 'http://http-api'
self.api = RequestsAPI(self)
self.log_publisher = LogPublisherThread(self)
self._order_queue = PeekableQueue()
if not (dont_do_archives and dont_do_macros):
self.arch_and_macros = ArchivingAndMacroThread(self, self._order_queue,
dont_do_macros, dont_do_archives)
if not delayed_boot:
self.arch_and_macros.start()
else:
self.arch_and_macros = None
self.dont_do_baobs = dont_do_baobs
self.dont_do_orders = dont_obtain_orders
self.executor = None
self.getter = None
if not dont_obtain_orders or not dont_do_predicates or not dont_do_pathpoints or \
not dont_do_baobs:
self.executor = OrderExecutorThread(self, self._order_queue, self.pp_database,
startup_delay)
self.getter = CommunicatorThread(self, self._order_queue, self.pp_database,
dont_obtain_orders,
dont_do_baobs,
dont_do_pathpoints,
dont_do_predicates,
dont_sync_sensor_writes, startup_delay)
else:
self.ready_lock.release()
if not delayed_boot:
if self.use_ngtt:
self.sync_worker = NGTTSyncWorker(self)
else:
self.sync_worker = HTTPSyncWorker(self)
if not dont_obtain_orders or not dont_do_predicates or not dont_do_pathpoints or \
not dont_do_baobs:
self.executor.start()
self.getter.start()
self.log_publisher.start()
self.boot_completed = True
else:
self.sync_worker = None
self.boot_completed = False
def continue_boot(self):
"""
Call this to continue the booting if delayed_start was given in the constructor
This will start the communicator and order executor thread.
:raise RuntimeError: delayed boot was not given
"""
if not self.delayed_boot:
raise RuntimeError('Delayed boot was not given')
if self.use_ngtt:
self.sync_worker = NGTTSyncWorker(self)
else:
self.sync_worker = HTTPSyncWorker(self)
Optional(self.executor).start()
Optional(self.getter).start()
Optional(self.arch_and_macros).start()
self.log_publisher.start()
self.boot_completed = True
@must_be_open
def log_sensor_write(self, sw: SensorWriteEvent):
"""
Log that a sensor has been written and enqueue it for cloud upload.
This will be called by the user. Note that :meth:`smok.sensor.Sensor.write` does not automatically generate
a saving entry.
:param sw: sensor write event to upload
:raises RuntimeError: device already closed
"""
self.sensor_write_database.add_sw(sw)
@must_be_open
def reset_predicates(self):
"""
Clear all loaded predicates and force a renew of loading.
Discards all currently loaded Predicate instances. Currently registered statistics will
remain registered.
.. warning:: Currently requires Internet access to restore predicates
:raises RuntimeError: device already closed
"""
self.predicates = {}
self.getter.last_predicates_synced = time.monotonic() - PREDICATE_SYNC_INTERVAL
@must_be_open
def get_baob(self, key: str) -> BAOB:
"""
Retrieve given BAOB
:raises UnavailableError: client was launched in a mode with BAOBs disabled
:raises RuntimeError: device already closed
"""
if self.dont_do_baobs:
raise UnavailableError('Support for BAOBs was disabled')
return BAOB(self, key)
@must_be_open
def get_all_baobs(self) -> tp.Iterator[BAOB]:
"""
Stream all BAOBs
:raises UnavailableError: client was launched in a mode with BAOBs disabled
:raises RuntimeError: device already closed
"""
if self.dont_do_baobs:
raise UnavailableError('Support for BAOBs was disabled')
for key in self.baob_database.get_all_keys():
yield BAOB(self, key)
def wait_until_synced(self) -> None:
"""
Block until everything's synchronized with the server.
Note that this is a no-op if no communicator thread is spawned.
"""
self.ready_lock.acquire()
self.ready_lock.release()
def _execute_message_order(self, order: MessageOrder) -> None:
"""
Tell the server to execute provided :class:`~smok.pathpoint.orders.MessageOrder`
:param order: order to execute
:meta public:
"""
for i in range(3):
try:
self.api.post('/v1/device/orders/message/' + order.uuid)
self.on_successful_sync()
except ResponseError as e:
if e.is_no_link():
self.on_failed_sync()
@property
def timezone(self) -> pytz.timezone:
"""
:return: the timezone this device is in
"""
if self._timezone is None:
self.get_device_info()
return pytz.timezone(self._timezone)
@must_be_open
def close_event(self, event: Event, timestamp: tp.Optional[int] = None) -> None:
"""
Close the provided event
:param event: event to close
:param timestamp: timestamp of close. Defaults to now
:raises RuntimeError: device already closed
"""
assert not event.is_closed()
if event.ended_on is None:
event.ended_on = timestamp or time_as_int()
self.evt_database.close_event(event)
@must_be_open
def get_open_event(self, event_id: str) -> Event:
"""
Return a particular opened event
:param event_id: opened event UUID
:return: a particular event
:raises KeyError: event not found, or already closed
:rtype: Event
:raise UnavailableError: SMOKDevice was launched in a no-predicate mode
:raises RuntimeError: device already closed
"""
if self.dont_do_predicates:
raise UnavailableError('SMOKDevice was launched without predicates')
for event in self.evt_database.get_open_events():
if event.uuid_matches(event_id):
return event
raise KeyError()
@must_be_open
def get_all_sensors(self) -> tp.Iterator[Sensor]:
"""
Stream all sensors
.. note:: This will block until sensors are synced from the server
:raises RuntimeError: device already closed
"""
with self.ready_lock:
yield from self.sensor_database.get_all_sensors()
def on_baob_updated(self, baob_name: str) -> None:
"""
Called by CommunicatorThread after given BAOB was updated.
After this is called, the new BAOB can be successfully loaded and it's new
contents will be retrieved.
This is not invoked during the first synchronization, nor after you change the BAOB
on the client side.
:param baob_name: name of the BAOB that was just downloaded from the server
"""
@must_be_open
def get_sensor(self, tag_set: tp.Union[tp.Set[str], str]) -> Sensor:
"""
Return a sensor
.. note:: This will block until sensors are synced from the server
:param tag_set: either set of strs or these strs joined with a ' '
:return: sensor
:raises KeyError: sensor does not exist
:raises RuntimeError: device already closed
"""
with self.ready_lock:
if isinstance(tag_set, set):
tag_set = list(tag_set)
tag_set.sort()
tag_set = ' '.join(tag_set)
else:
tag_set = fqtsify(tag_set)
return self.sensor_database.get_sensor(tag_set)
@must_be_open
def get_all_events(self) -> tp.Iterator[Event]:
"""
Return all events kept in device's database
:raise UnavailableError: SMOKDevice was launched in a no-predicate mode
:raises RuntimeError: device already closed
"""
if self.dont_do_predicates:
raise UnavailableError('SMOKDevice was launched without predicates')
return self.evt_database.get_all_events()
@must_be_open
def get_all_open_events(self) -> tp.Iterator[Event]:
"""
Get all open events
:raise UnavailableError: SMOKDevice was launched in a no-predicate mode
:raises RuntimeError: device already closed
"""
if self.dont_do_predicates:
raise UnavailableError('SMOKDevice was launched without predicates')
return self.evt_database.get_open_events()
@must_be_open
def open_event(self, started_on: int, ended_on: tp.Optional[int],
color: Color, is_point: bool, token: str, group: str, message: str,
metadata: tp.Optional[tp.Dict[str, str]] = None) -> Event:
"""
Create a new event
:param started_on: timestamp in seconds, when was the event started?
:param ended_on: timestamp in seconds when has the event ended, None in case of
open events.
:param color: :term:`Color` of an event
:param is_point: whether this is a :term:`point event`
:param token: a string
:param group: notification group
:param message: human-readable message
:param metadata: extra metadata. This must be dict'able
:return: the Event object
:raise UnavailableError: SMOKDevice was launched in a no-predicate mode
:raises RuntimeError: device already closed
"""
if self.dont_do_predicates:
raise UnavailableError('SMOKDevice was launched without predicates')
metadata = dict(metadata or {})
event = Event(None, started_on, ended_on, color, is_point, token, group, message,
None, metadata)
self.evt_database.add_event(event)
return event
def execute(self, *secs: Section) -> None:
"""
Schedule sections to be executed.
To be invoked by any thread, as well as the user. Use this to inject sections into device's execution loop.
:param secs: sections to be executed, in that order, unless they're joinable
"""
for sec in secs:
self._order_queue.put(sec)
@must_be_open
def get_pathpoint(self, path: str,
storage_level: StorageLevel = StorageLevel.TREND) -> Pathpoint:
"""
Obtain a pathpoint. Creates one and registers it if not available.
Prefer to call this than :meth:`smok.client.Client.provide_unknown_pathpoint`. This will be called
by threads, as well as the user.
:param path: path of the pathpoint
:param storage_level: target storage level
:return: a pathpoint having provided name
:raises KeyError: pathpoint not available
:raises RuntimeError: device already closed
"""
if path[0] == 'r':
return ReparsePathpoint(self, path, storage_level)
if path in self.pathpoints:
return self.pathpoints[path]
pp = self.provide_unknown_pathpoint(path, storage_level) # raises KeyError
self.register_pathpoint(pp)
return pp
@must_be_open
def register_statistic(self, stat: tp.Type[BaseStatistic],
predicate: tp.Callable[[str, dict], bool]) -> StatisticRegistration:
"""
Register a new statistic.
Statistics can be registered at any point. If there are pending predicates,
instances will be created for them shortly by the communicator thread.
:param stat: a class (not an instance) to register
:param predicate: a callable taking two arguments: statistic name and it's configuration.
The callable should return whether to apply stat to this predicate
:return: a Registration object. Can be later cancelled.
:raise UnavailableError: SMOKDevice was launched in a no-predicate mode
:raises RuntimeError: device already closed
"""
if self.dont_do_predicates:
raise UnavailableError('SMOKDevice was launched without predicates')
assert issubclass(stat, BaseStatistic), 'Not a subclass of BaseStatistic!'
reg = StatisticRegistration(predicate, stat)
self.statistic_registration.add(reg)
return reg
@must_be_open
def register_pathpoint(self, pp: Pathpoint) -> None:
"""
Register a pathpoint for usage with this SMOKDevice.
Normally, you shouldn't need to use it, as the :class:`~smokclient.pathpoint.Pathpoint`
constructor does that for you. However, if you provide it's `SMOKDevice` parameter as None,
this call is still required
:param pp: pathpoint to register
:raises RuntimeError: device already closed
"""
if pp.name[0] == 'r':
return
if pp.name not in self.pathpoints:
pp.device = weakref.proxy(self)
self.pathpoints[pp.name] = pp
def execute_sysctl(self, op_type: str, op_args: str) -> bool:
"""
Called by executor thread upon receiving a request to execute a particular SysctlOrder
Handles commonly defined sysctls of `baob-created` and `baob-updated`.
Extend to implement custom sysctls.
Sysctl orders are user-defined.
:param op_type: type of operation to execute
:param op_args: argument of the operation to execute.
:return: whether this command was recognized and acted upon
"""
if self.dont_do_baobs:
return False
if op_type in ('baob-updated', 'baob-created'):
if self.getter is not None:
self.getter.last_baob_synced = 0
self.getter.data_to_update.notify()
return True
elif op_type == 'baob-deleted':
self.baob_database.delete_baob(op_args)
return True
return False
def close(self) -> None:
"""
Close the connection, clean up the resources.
This may block for up to 10 seconds.
No-op if called more than once.
"""
if super().close():
Optional(self.executor).terminate()
Optional(self.getter).terminate()
self.log_publisher.terminate()
Optional(self.arch_and_macros).terminate()
if self.priv_key_file_name is None:
os.unlink(self.temp_file_for_key)
if self.cert_file_name is None:
os.unlink(self.temp_file_for_cert)
Optional(self.executor).join()
Optional(self.getter).join()
if self.boot_completed:
self.log_publisher.join()
Optional(self.arch_and_macros).join()
@must_be_open
@for_argument(returns=list)
def get_slaves(self) -> tp.List[SlaveDevice]:
"""
Return information about slave devices
:return: a list of slave devices
:raises ResponseError: server responded (or not) with an invalid message
:raises RuntimeError: device already closed
"""
slaves = self.get_device_info().slaves
for slave in slaves:
yield SlaveDevice(self, slave)
@must_be_open
def get_device_info(self) -> DeviceInfo:
"""
Obtain information about the device.
Note that this will result in :meth:`~smok.client.SMOKDevice.on_failed_sync`
or :meth:`~smok.client.SMOKDevice.on_successful_sync` being called.
:return: current device information
:raises ResponseError: server responded (or not) with an invalid message
:raises RuntimeError: :attr:`~smok.client.SMOKDevice.allow_sync` was set to False or
device already closed
"""
if not self.allow_sync:
raise RuntimeError('allow_sync is False, cannot fetch the information')
try:
resp = DeviceInfo.from_json(self.api.get('/v1/device'))
except ResponseError as e:
if e.is_no_link():
self.on_failed_sync()
raise
self.on_successful_sync()
self._timezone = resp.timezone
return resp
def on_successful_sync(self, *args, **kwargs) -> None:
"""
Called by CommunicatorThread each time a part of the system synchronizes correctly with the
server. Can be used to implement link liveness detector.
args and kwargs left for future extendability.
Please note that if you set allow_sync to False this won't be called, as the client won't
try to talk to the server. The only chance is getting synces manually, as via
:meth:`~smok.client.SMOKDevice.get_device_info`
"""
def on_failed_sync(self, *args, **kwargs) -> None:
"""
Called by CommunicatorThread each time a part of the system fails to synchronize by
receiving no response at all from the server. Can be used to implement link liveness
detector.
args and kwargs left for future extendability
Please note that if you set allow_sync to False this won't be called, as the client won't
try to talk to the server. The only chance is getting synces manually, as via
:meth:`~smok.client.SMOKDevice.get_device_info`
"""
@must_be_open
def get_local_time(self) -> datetime.datetime:
"""
Return current local time on target culture context
:return: a datetime object having the local time for this device
:raises RuntimeError: device already closed
"""
# What is the time on target device?
tz = self.timezone
utc_time = pytz.UTC.localize(datetime.datetime.utcfromtimestamp(time.time()))
local_time = utc_time.astimezone(tz)
return local_time
@property
def cert_chain(self) -> bytes:
"""
Return your own certificate chain, finishing at SMOK CA certificate
:return: certificate chain in PEM format
"""
my_dat = self.cert_data
dev_ca_dat = get_dev_ca_cert()
root_ca_dat = get_root_cert()
return b''.join((my_dat, dev_ca_dat, root_ca_dat))
def clear_closed_and_synced_events(self) -> None:
"""
Clear all closed and synchronized events
:raises UnavailableError: SMOKDevice was launched without predicates
"""
if self.dont_do_predicates:
raise UnavailableError('SMOKDevice was launched without predicates')
self.evt_database.clear_closed_and_synced_events()