simplipy/websocket.py
"""Define a connection to the SimpliSafe websocket."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import InitVar, dataclass, field
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Final, cast
from aiohttp import ClientWebSocketResponse, WSMsgType
from aiohttp.client_exceptions import ClientError
from simplipy.const import DEFAULT_USER_AGENT, LOGGER
from simplipy.device import DeviceTypes
from simplipy.errors import (
CannotConnectError,
ConnectionClosedError,
ConnectionFailedError,
InvalidMessageError,
NotConnectedError,
)
from simplipy.util import CallbackType, execute_callback
from simplipy.util.dt import utc_from_timestamp, utcnow
if TYPE_CHECKING:
from simplipy import API
WEBSOCKET_SERVER_URL = "wss://socketlink.prd.aser.simplisafe.com"
DEFAULT_WATCHDOG_TIMEOUT = timedelta(minutes=5)
EVENT_ALARM_CANCELED: Final = "alarm_canceled"
EVENT_ALARM_TRIGGERED: Final = "alarm_triggered"
EVENT_ARMED_AWAY: Final = "armed_away"
EVENT_ARMED_AWAY_BY_KEYPAD: Final = "armed_away_by_keypad"
EVENT_ARMED_AWAY_BY_REMOTE: Final = "armed_away_by_remote"
EVENT_ARMED_HOME: Final = "armed_home"
EVENT_AUTOMATIC_TEST: Final = "automatic_test"
EVENT_AWAY_EXIT_DELAY_BY_KEYPAD: Final = "away_exit_delay_by_keypad"
EVENT_AWAY_EXIT_DELAY_BY_REMOTE: Final = "away_exit_delay_by_remote"
EVENT_BASE_STATION_UPDATE_SUCCEEDED: Final = "base_station_update_succeeded"
EVENT_CAMERA_MOTION_DETECTED: Final = "camera_motion_detected"
EVENT_CONNECTION_LOST: Final = "connection_lost"
EVENT_CONNECTION_RESTORED: Final = "connection_restored"
EVENT_DISARMED_BY_KEYPAD: Final = "disarmed_by_keypad"
EVENT_DISARMED_BY_REMOTE: Final = "disarmed_by_remote"
EVENT_DOORBELL_DETECTED: Final = "doorbell_detected"
EVENT_DEVICE_TEST: Final = "device_test"
EVENT_ENTRY_DELAY: Final = "entry_delay"
EVENT_HOME_EXIT_DELAY: Final = "home_exit_delay"
EVENT_LOCK_ERROR: Final = "lock_error"
EVENT_LOCK_LOCKED: Final = "lock_locked"
EVENT_LOCK_UNLOCKED: Final = "lock_unlocked"
EVENT_POWER_OUTAGE: Final = "power_outage"
EVENT_POWER_RESTORED: Final = "power_restored"
EVENT_SECRET_ALERT_TRIGGERED: Final = "secret_alert_triggered"
EVENT_SENSOR_NOT_RESPONDING: Final = "sensor_not_responding"
EVENT_SENSOR_PAIRED_AND_NAMED: Final = "sensor_paired_and_named"
EVENT_SENSOR_RESTORED: Final = "sensor_restored"
EVENT_USER_INITIATED_CAMERA_RECORDING: Final = "user_initiated_camera_recording"
EVENT_USER_INITIATED_TEST: Final = "user_initiated_test"
EVENT_MAPPING = {
1110: EVENT_ALARM_TRIGGERED,
1120: EVENT_ALARM_TRIGGERED,
1132: EVENT_ALARM_TRIGGERED,
1134: EVENT_ALARM_TRIGGERED,
1154: EVENT_ALARM_TRIGGERED,
1159: EVENT_ALARM_TRIGGERED,
1162: EVENT_ALARM_TRIGGERED,
1170: EVENT_CAMERA_MOTION_DETECTED,
1301: EVENT_POWER_OUTAGE,
1350: EVENT_CONNECTION_LOST,
1381: EVENT_SENSOR_NOT_RESPONDING,
1400: EVENT_DISARMED_BY_KEYPAD,
1406: EVENT_ALARM_CANCELED,
1407: EVENT_DISARMED_BY_REMOTE,
1409: EVENT_SECRET_ALERT_TRIGGERED,
1429: EVENT_ENTRY_DELAY,
1458: EVENT_DOORBELL_DETECTED,
1531: EVENT_SENSOR_PAIRED_AND_NAMED,
1601: EVENT_USER_INITIATED_TEST,
1602: EVENT_AUTOMATIC_TEST,
1604: EVENT_DEVICE_TEST,
1609: EVENT_USER_INITIATED_CAMERA_RECORDING,
3301: EVENT_POWER_RESTORED,
3350: EVENT_CONNECTION_RESTORED,
3381: EVENT_SENSOR_RESTORED,
3401: EVENT_ARMED_AWAY_BY_KEYPAD,
3407: EVENT_ARMED_AWAY_BY_REMOTE,
3441: EVENT_ARMED_HOME,
3481: EVENT_ARMED_AWAY,
3487: EVENT_ARMED_AWAY,
3491: EVENT_ARMED_HOME,
9401: EVENT_AWAY_EXIT_DELAY_BY_KEYPAD,
9407: EVENT_AWAY_EXIT_DELAY_BY_REMOTE,
9441: EVENT_HOME_EXIT_DELAY,
9700: EVENT_LOCK_UNLOCKED,
9701: EVENT_LOCK_LOCKED,
9703: EVENT_LOCK_ERROR,
9903: EVENT_BASE_STATION_UPDATE_SUCCEEDED,
}
class Watchdog:
"""Define a watchdog to kick the websocket connection at intervals."""
def __init__(
self,
action: Callable[..., Awaitable[None]],
timeout: timedelta = DEFAULT_WATCHDOG_TIMEOUT,
):
"""Initialize.
Args:
action: The coroutine function to call when the watchdog expires.
timeout: The time duration before the watchdog times out.
"""
self._action = action
self._action_task: asyncio.Task | None = None
self._loop = asyncio.get_running_loop()
self._timeout_seconds = timeout.total_seconds()
self._timer_task: asyncio.TimerHandle | None = None
def _on_expire(self) -> None:
"""Log and act when the watchdog expires."""
LOGGER.info("Websocket watchdog expired")
execute_callback(self._action)
def cancel(self) -> None:
"""Cancel the watchdog."""
if self._timer_task:
self._timer_task.cancel()
self._timer_task = None
def trigger(self) -> None:
"""Trigger the watchdog."""
LOGGER.info(
"Websocket watchdog triggered – sleeping for %s seconds",
self._timeout_seconds,
)
if self._timer_task:
self._timer_task.cancel()
self._timer_task = self._loop.call_later(self._timeout_seconds, self._on_expire)
@dataclass(frozen=True)
class WebsocketEvent:
"""Define a representation of a message."""
event_cid: InitVar[int]
info: str
system_id: int
_raw_timestamp: float
_video: dict | None
_vid: str | None
event_type: str | None = field(init=False)
timestamp: datetime = field(init=False)
media_urls: dict[str, str] | None = field(init=False)
changed_by: str | None = None
sensor_name: str | None = None
sensor_serial: str | None = None
sensor_type: DeviceTypes | None = None
def __post_init__(self, event_cid: int) -> None:
"""Run post-init initialization.
Args:
event_cid: A SimpliSafe code for a particular event.
"""
if event_cid in EVENT_MAPPING:
object.__setattr__(self, "event_type", EVENT_MAPPING[event_cid])
else:
LOGGER.warning(
'Encountered unknown websocket event type: %s ("%s"). Please report it '
"at https://github.com/bachya/simplisafe-python/issues.",
event_cid,
self.info,
)
object.__setattr__(self, "event_type", None)
object.__setattr__(self, "timestamp", utc_from_timestamp(self._raw_timestamp))
if self.sensor_type is not None:
try:
object.__setattr__(self, "sensor_type", DeviceTypes(self.sensor_type))
except ValueError:
LOGGER.warning(
'Encountered unknown device type: %s ("%s"). Please report it at'
"https://github.com/home-assistant/home-assistant/issues.",
self.sensor_type,
self.info,
)
object.__setattr__(self, "sensor_type", None)
if self._vid is not None and self._video is not None:
object.__setattr__(
self,
"media_urls",
{
"image_url": self._video[self._vid]["_links"]["snapshot/jpg"][
"href"
],
"clip_url": self._video[self._vid]["_links"]["download/mp4"][
"href"
],
"hls_url": self._video[self._vid]["_links"]["playback/hls"]["href"],
},
)
object.__setattr__(self, "_vid", None)
object.__setattr__(self, "_video", None)
else:
object.__setattr__(self, "media_urls", None)
def websocket_event_from_payload(payload: dict[str, Any]) -> WebsocketEvent:
"""Create a Message object from a websocket event payload.
Args:
payload: A raw websocket response payload.
Returns:
A parsed WebsocketEvent object.
"""
return WebsocketEvent(
payload["data"]["eventCid"],
payload["data"]["info"],
payload["data"]["sid"],
payload["data"]["eventTimestamp"],
payload["data"].get("video"),
payload["data"].get("videoStartedBy"),
changed_by=payload["data"]["pinName"],
sensor_name=payload["data"]["sensorName"],
sensor_serial=payload["data"]["sensorSerial"],
sensor_type=payload["data"]["sensorType"],
)
class WebsocketClient:
"""A websocket connection to the SimpliSafe cloud.
Note that this class shouldn't be instantiated directly; it will be instantiated as
appropriate via :meth:`simplipy.API.async_from_auth` or
:meth:`simplipy.API.async_from_refresh_token`.
Args:
api: A simplipy API object.
"""
def __init__(self, api: API) -> None:
"""Initialize.
Args:
api: A simplipy API object.
"""
self._api = api
self._connect_callbacks: list[CallbackType] = []
self._disconnect_callbacks: list[CallbackType] = []
self._event_callbacks: list[CallbackType] = []
self._loop = asyncio.get_running_loop()
self._watchdog = Watchdog(self.async_reconnect)
# These will get filled in after initial authentication:
self._client: ClientWebSocketResponse = None # type: ignore[assignment]
@property
def connected(self) -> bool:
"""Return if currently connected to the websocket.
Returns:
Whether the websocket is connected.
"""
return self._client is not None and not self._client.closed
@staticmethod
def _add_callback(
callback_list: list[CallbackType], callback: CallbackType
) -> Callable[[], None]:
"""Add a callback to a particular list.
Args:
callback_list: A list on this object to store the callback in.
callback: The callback to execute.
Returns:
A callable to cancel the callback.
"""
callback_list.append(callback)
def remove() -> None:
"""Remove the callback."""
callback_list.remove(callback)
return remove
async def _async_receive_json(self) -> dict[str, Any]:
"""Receive a JSON response from the websocket server.
Returns:
A websocket response payload.
Raises:
ConnectionClosedError: Raised when the server closes the websocket.
ConnectionFailedError: Raised when the websocket fails in anyway.
InvalidMessageError: Raised when an invalid message is received.
"""
msg = await self._client.receive()
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
raise ConnectionClosedError("Connection was closed.")
if msg.type == WSMsgType.ERROR:
raise ConnectionFailedError
if msg.type != WSMsgType.TEXT:
raise InvalidMessageError(f"Received non-text message: {msg.type}")
try:
data = msg.json()
except ValueError as err:
raise InvalidMessageError("Received invalid JSON") from err
LOGGER.debug("Received data from websocket server: %s", data)
self._watchdog.trigger()
return cast(dict[str, Any], data)
async def _async_send_json(self, payload: dict[str, Any]) -> None:
"""Send a JSON message to the websocket server.
Args:
payload: A JSON payload.
Raises:
NotConnectedError: Raised if client is not connected.
"""
if not self.connected:
raise NotConnectedError
LOGGER.debug("Sending data to websocket server: %s", payload)
await self._client.send_json(payload)
def _parse_payload(self, payload: dict[str, Any]) -> None:
"""Parse an incoming payload.
Args:
payload: A JSON payload.
"""
if payload["type"] == "com.simplisafe.event.standard":
event = websocket_event_from_payload(payload)
for callback in self._event_callbacks:
execute_callback(callback, event)
def add_connect_callback(
self, callback: Callable[[], Awaitable[None] | None]
) -> Callable[[], None]:
"""Add a callback to be called after connecting.
Args:
callback: The callback to execute.
Returns:
A callable to cancel the callback.
"""
return self._add_callback(self._connect_callbacks, callback)
def add_disconnect_callback(
self, callback: Callable[[], Awaitable[None] | None]
) -> Callable[[], None]:
"""Add a callback to be called after disconnecting.
Args:
callback: The callback to execute.
Returns:
A callable to cancel the callback.
"""
return self._add_callback(self._disconnect_callbacks, callback)
def add_event_callback(
self, callback: Callable[[WebsocketEvent], Awaitable[None] | None]
) -> Callable[[], None]:
"""Add a callback to be called upon receiving an event.
Note that callbacks should expect to receive a WebsocketEvent object as a
parameter.
Args:
callback: The callback to execute.
Returns:
A callable to cancel the callback.
"""
return self._add_callback(self._event_callbacks, callback)
async def async_connect(self) -> None:
"""Connect to the websocket server.
Raises:
CannotConnectError: Raises when we cannot connect to the websocket.
"""
if self.connected:
return
try:
self._client = await self._api.session.ws_connect(
WEBSOCKET_SERVER_URL, heartbeat=55
)
except ClientError as err:
raise CannotConnectError(err) from err
LOGGER.info("Connected to websocket server")
self._watchdog.trigger()
for callback in self._connect_callbacks:
execute_callback(callback)
async def async_disconnect(self) -> None:
"""Disconnect from the websocket server."""
if not self.connected:
return
await self._client.close()
LOGGER.info("Disconnected from websocket server")
async def async_listen(self) -> None:
"""Start listening to the websocket server."""
now = utcnow()
now_ts = round(now.timestamp() * 1000)
now_utc_iso = f"{now.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]}Z"
try:
await self._async_send_json(
{
"datacontenttype": "application/json",
"type": "com.simplisafe.connection.identify",
"time": now_utc_iso,
"id": f"ts:{now_ts}",
"specversion": "1.0",
"source": DEFAULT_USER_AGENT,
"data": {
"auth": {
"schema": "bearer",
"token": self._api.access_token,
},
"join": [f"uid:{self._api.user_id}"],
},
}
)
while not self._client.closed:
message = await self._async_receive_json()
self._parse_payload(message)
except ConnectionClosedError:
pass
finally:
LOGGER.debug("Listen completed; cleaning up")
self._watchdog.cancel()
for callback in self._disconnect_callbacks:
execute_callback(callback)
async def async_reconnect(self) -> None:
"""Reconnect (and re-listen, if appropriate) to the websocket."""
await self.async_disconnect()
await asyncio.sleep(1)
await self.async_connect()