sipa/utils/__init__.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
General utilities
"""

import dataclasses
import http.client
import json
import logging
import typing
from datetime import date, datetime
from functools import wraps
from itertools import chain
from operator import itemgetter

import icalendar
import markdown
import recurring_ical_events
import requests
from cachetools import TTLCache, cached
from dateutil.relativedelta import relativedelta
from flask import flash, redirect, request, url_for
from flask_login import current_user
from icalendar import Calendar
from werkzeug.http import parse_date as parse_datetime

from flask.globals import current_app

logger = logging.getLogger(__name__)


def get_bustimes(stopname, count=10):
    """Parses the VVO-Online API return string.
    API returns in format [["line", "to", "minutes"],[__],[__]], where "__" are
    up to nine more Elements.

    :param stopname: Requested stop.
    :param count: Limit the entries for the stop.
    """
    conn = http.client.HTTPConnection('widgets.vvo-online.de', timeout=1)

    stopname = stopname.replace(' ', '%20')
    try:
        conn.request(
            'GET',
            f'/abfahrtsmonitor/Abfahrten.do?ort=Dresden&hst={stopname}'
        )
        response = conn.getresponse()
    except OSError:
        return None

    response_data = json.loads(response.read().decode())

    return ({
        'line': i[0],
        'dest': i[1],
        'minutes_left': int(i[2]) if i[2] else 0,
    } for i in response_data)
# TODO: check whether this is the correct format


@cached(cache=TTLCache(maxsize=1, ttl=2 * 60))
def try_fetch_hotline_availability(uri: str) -> bool:
    """Determines whether there are agents logged in to anwser calls to our
    support hotline.
    """
    if not (r := requests.get(uri)):
        return False

    return r.text == "AVAILABLE"


def support_hotline_available():
    return try_fetch_hotline_availability(current_app.config["PBX_URI"])


@cached(cache=TTLCache(maxsize=1, ttl=300))
def try_fetch_calendar(url: str) -> Calendar | None:
    """Fetch an ICAL calendar from a given URL."""
    try:
        response = requests.get(url, timeout=1)
    except requests.exceptions.RequestException:
        logger.exception("Error when fetching calendar at %s", url)
        return
    if response.status_code != 200:
        logger.error("Got unknown status code %s", response.status_code)
        return

    try:
        return icalendar.Calendar.from_ical(response.text)
    except ValueError:
        logger.exception("Could not parse calendar response %s", response.text)
        return


Event = typing.TypedDict(
    "Event",
    {
        "CREATED": icalendar.prop.vDDDTypes,
        "LAST-MODIFIED": icalendar.prop.vDDDTypes,
        "DTSTAMP": icalendar.prop.vDDDTypes,
        "SUMMARY": icalendar.prop.vText,
        "PRIORITY": int,
        "RELATED-TO": icalendar.prop.vText,
        "X-MOZ-LASTACK": icalendar.prop.vText,
        "DTSTART": icalendar.prop.vDDDTypes,
        "DTEND": icalendar.prop.vDDDTypes,
        "CLASS": icalendar.prop.vText,
        "LOCATION": icalendar.prop.vText,
        "SEQUENCE": int,
        "TRANSP": icalendar.prop.vText,
        "X-APPLE-TRAVEL-ADVISORY-BEHAVIOR": icalendar.prop.vText,
        "X-MICROSOFT-CDO-BUSYSTATUS": icalendar.prop.vText,
        "X-MOZ-GENERATION": icalendar.prop.vText,
    }
)


def events_from_calendar(calendar: icalendar.Calendar) -> list[Event]:
    """Given a calendar, extract the events up until one month in the future."""
    return recurring_ical_events.of(calendar).between(
        datetime.now(), datetime.now() + relativedelta(months=1)
    )


@cached(cache=TTLCache(maxsize=1, ttl=300))
def meetingcal():
    """Returns the calendar events got form the url in the config"""
    if not (calendar := try_fetch_calendar(current_app.config['MEETINGS_ICAL_URL'])):
        return []

    events = events_from_calendar(calendar)
    next_meetings = [
        {
            "title": event["SUMMARY"],
            "datetime": event["DTSTART"].dt,
            "location": event["LOCATION"] if "LOCATION" in event else "-",
            "location_link": markdown.markdown(event["LOCATION"])
            if "LOCATION" in event
            else "-",
        }
        for event in events
    ]
    next_meetings = sorted(next_meetings, key=itemgetter("datetime"))
    return next_meetings


def subscribe_to_status_page(url: str, token: str, request_timeout: int, email: str) -> bool | None:
    """Send subscription request to status page API endpoint

    Returns:
        bool or None: Result whether subscribing to the status page worked
    """
    try:
        response = requests.post(
            url,
            timeout=request_timeout,
            headers={
                "Authorization": "Token " + token
            },
            json={"email": email}
        )
    except requests.exceptions.RequestException:
        logger.exception("Error when sending request to %s", url)
        return None
    if response.status_code == 400:
        # bad request usually means that the person has already subscribed to the status page
        return False
    if response.status_code == 201:
        # Subscribing to the status page worked
        return True

    # unexpected response from the status page
    logger.exception("Unexpected response when sending request to %s: %s", url, response.reason)
    return None


def password_changeable(user):
    """A decorator used to disable functions (routes) if a certain feature
    is not provided by the User class.

    given_features has to be a callable to ensure runtime distinction
    between datasources.

    :param needed_feature: The feature needed
    :param given_features: A callable returning the set of supported features
    :return:
    """
    def feature_decorator(func):
        @wraps(func)
        def decorated_view(*args, **kwargs):
            if user.is_authenticated and user.can_change_password:
                return func(*args, **kwargs)
            else:
                def not_supported():
                    flash("Diese Funktion ist nicht verfügbar.", 'error')
                    return redirect(redirect_url())
                return not_supported()

        return decorated_view
    return feature_decorator


def get_user_name(user=current_user):
    if user.is_authenticated:
        return user.uid

    if user.is_anonymous:
        return 'anonymous'

    return ''


def url_self(**values):
    """Generate a URL to the request's current endpoint with the same view
    arguments.

    Additional arguments can be specified to override or extend the current view
    arguments.

    :param values: Additional variable arguments for the endpoint
    :return: A URL to the current endpoint
    """
    if request.endpoint is None:
        endpoint = 'generic.index'
    else:
        endpoint = request.endpoint
    # if no endpoint matches the given URL, `request.view_args` is
    # ``None``, not ``{}``
    kw = request.view_args.copy() if request.view_args is not None else {}
    kw.update(values)
    return url_for(endpoint, **kw)


def redirect_url(default='generic.index'):
    return request.args.get('next') or request.referrer or url_for(default)


def argstr(*args, **kwargs):
    return ", ".join(chain(
        (f"{arg}" for arg in args),
        (f"{key}={val!r}" for key, val in kwargs.items()),
    ))


def compare_all_attributes(one: object, other: object, attr_list: typing.Iterable[str]) -> bool:
    """Safely compare whether two ojbect's attributes are equal.

    :param one: The first object
    :param other: The second object
    :param attr_list: A list of attribute names.

    :returns: Whether the attributes are equal or false on
              `AttributeError`
    """
    try:
        return all(getattr(one, attr) == getattr(other, attr)
                   for attr in attr_list)
    except AttributeError:
        return False


def xor_hashes(*elements: object) -> int:
    """Combine all element's hashes with xor
    """
    _hash = 0
    for element in elements:
        _hash ^= hash(element)

    return _hash


def parse_date(date: str | None) -> date | None:
    return parse_datetime(date).date() if date is not None else None


def dataclass_from_dict(cls, raw: dict):
    fields = {field.name for field in dataclasses.fields(cls)}
    kwargs = {key: value for key, value in raw.items() if key in fields}
    return cls(**kwargs)