components/api_server/src/routes/measurement.py
"""Measurement routes."""
import logging
import time
from collections.abc import Iterator
from typing import cast
import bottle
from pymongo.database import Database
from shared.database.measurements import insert_new_measurement, latest_measurement
from shared.model.measurement import Measurement
from shared.utils.functions import first
from shared.utils.type import MetricId, SourceId
from database import sessions
from database.measurements import count_measurements, all_metric_measurements, measurements_in_period
from database.reports import latest_report_for_uuids, latest_reports
from utils.functions import report_date_time
from .plugins.auth_plugin import EDIT_ENTITY_PERMISSION
@bottle.post(
"/api/internal/measurement/<metric_uuid>/source/<source_uuid>/entity/<entity_key>/<attribute>",
permissions_required=[EDIT_ENTITY_PERMISSION],
)
def set_entity_attribute(
metric_uuid: MetricId,
source_uuid: SourceId,
entity_key: str,
attribute: str,
database: Database,
) -> Measurement:
"""Set an entity attribute."""
report = latest_report_for_uuids(latest_reports(database), metric_uuid)[0]
metric = report.metrics_dict[metric_uuid]
new_measurement = cast(Measurement, latest_measurement(database, metric)).copy()
source = first(new_measurement["sources"], lambda source: source["source_uuid"] == source_uuid)
entity = first(source["entities"], lambda entity: entity["key"] == entity_key)
entity_description = "/".join([str(entity[key]) for key in entity if key not in ("key", "url")])
old_value = source.get("entity_user_data", {}).get(entity_key, {}).get(attribute) or ""
new_value = dict(bottle.request.json)[attribute]
user = sessions.find_user(database)
description = f"{user.name()} changed the {attribute} of '{entity_description}' from '{old_value}' to '{new_value}'"
entity_user_data = source.setdefault("entity_user_data", {}).setdefault(entity_key, {})
entity_user_data[attribute] = new_value
if attribute == "status" and (end_date := report.deadline(new_value)):
old_end_date = entity_user_data.get("status_end_date")
if end_date != old_end_date:
entity_user_data["status_end_date"] = end_date
description += f" and changed the status end date from '{old_end_date}' to '{end_date}'"
new_measurement["delta"] = {
"uuids": [report.uuid, metric.subject_uuid, metric_uuid, source_uuid],
"description": description + ".",
"email": user.email,
}
return insert_new_measurement(database, new_measurement)
def sse_pack(event_id: int, event: str, data: str, retry: str = "2000") -> str:
"""Pack data in Server-Sent Events (SSE) format."""
return f"retry: {retry}\nid: {event_id}\nevent: {event}\ndata: {data}\n\n"
@bottle.get("/api/internal/nr_measurements", authentication_required=False)
def stream_nr_measurements(database: Database) -> Iterator[str]:
"""Return the number of measurements as server sent events."""
# Keep event IDs consistent
event_id = int(bottle.request.get_header("Last-Event-Id", -1)) + 1
# Set the response headers
# https://serverfault.com/questions/801628/for-server-sent-events-sse-what-nginx-proxy-configuration-is-appropriate
bottle.response.set_header("Connection", "keep-alive")
bottle.response.set_header("Content-Type", "text/event-stream")
bottle.response.set_header("Cache-Control", "no-cache")
bottle.response.set_header("X-Accel-Buffering", "no")
# Provide the current number of measurements and a retry value to use in case of connection failure
nr_measurements = count_measurements(database)
logging.info("Initializing nr_measurements stream with %d measurements (event id = %d)", nr_measurements, event_id)
yield sse_pack(event_id, "init", str(nr_measurements))
event_id += 1
# Flush the buffer that prevents messages from being sent immediately by sending a large message
# Who or what is causing the buffering (bottle?, gevent?, nginx?), is a mystery, unfortunately
yield sse_pack(event_id, "flush", "." * 256**2)
# Now send the client the number of measurements periodically
while True:
time.sleep(10)
nr_measurements = count_measurements(database)
event_id += 1
logging.info("Updating nr_measurements stream with %d measurements (event id = %d)", nr_measurements, event_id)
yield sse_pack(event_id, "delta", str(nr_measurements))
@bottle.get("/api/internal/measurements", authentication_required=False)
def get_measurements(database: Database):
"""Return all measurements (without details) for all reports between the date and the minimum date."""
date_time = report_date_time()
min_date_time = report_date_time("min_report_date")
measurements = measurements_in_period(database, min_iso_timestamp=min_date_time, max_iso_timestamp=date_time)
return {"measurements": measurements}
@bottle.get("/api/internal/measurements/<metric_uuid>", authentication_required=False)
def get_metric_measurements(metric_uuid: MetricId, database: Database) -> dict:
"""Return the measurements for the metric."""
metric_uuid = cast(MetricId, metric_uuid.split("&")[0])
return {"measurements": all_metric_measurements(database, metric_uuid, max_iso_timestamp=report_date_time())}