avocado/core/status/server.py
import asyncio
import os
from avocado.core.settings import settings
class StatusServer:
"""Server that listens for status messages and updates a StatusRepo."""
def __init__(self, uri, repo):
"""Initializes a new StatusServer.
:param uri: either a "host:port" string or a path to a UNIX socket
:type uri: str
:param repo: the repository to use to process received status
messages
:type repo: :class:`avocado.core.status.repo.StatusRepo`
"""
self._uri = uri
self._repo = repo
self._server_task = None
@property
def uri(self):
return self._uri
async def create_server(self):
limit = settings.as_dict().get("run.status_server_buffer_size")
if ":" in self._uri:
host, port = self._uri.split(":")
port = int(port)
self._server_task = await asyncio.start_server(
self.cb, host=host, port=port, limit=limit
)
else:
self._server_task = await asyncio.start_unix_server(
self.cb, path=self._uri, limit=limit
)
async def serve_forever(self):
if self._server_task is None:
await self.create_server()
await self._server_task.serve_forever()
def close(self):
self._server_task.close()
if os.path.exists(self._uri):
os.unlink(self._uri)
async def cb(self, reader, _):
while True:
raw_message = await reader.readline()
if not raw_message:
return
self._repo.process_raw_message(raw_message)