sipa/model/pycroft/api.py
import logging
import typing as t
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date
from functools import partial
from typing import Any
import requests
import requests.auth
from requests import ConnectionError, HTTPError
from sipa.backends.exceptions import InvalidConfiguration
from sipa.utils import dataclass_from_dict
from .exc import PycroftBackendError
logger = logging.getLogger(__name__)
class PycroftApiError(RuntimeError):
def __init__(self, code: str, message: str, *a, **kw):
self.code = code
self.message = message
super().__init__(*a, **kw)
@dataclass
class MatchPersonResult:
begin: date
end: date
room_id: int
building: str
room: str
def __post_init__(self):
if isinstance(self.begin, str):
self.begin = date.fromisoformat(self.begin)
if isinstance(self.end, str):
self.end = date.fromisoformat(self.end)
@classmethod
def from_json(cls, json: dict):
return dataclass_from_dict(MatchPersonResult, json)
class PycroftAuthorization(requests.auth.AuthBase):
def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key
def __call__(self, r: requests.Request) -> requests.Request:
r.headers["Authorization"] = f"ApiKey {self.api_key}"
return r
class PycroftApi:
def __init__(self, endpoint: str, api_key: str):
if not endpoint.endswith("/"):
raise InvalidConfiguration("API endpoint must end with a '/'")
self._endpoint = endpoint
self.session = requests.Session()
self.session.auth = PycroftAuthorization(api_key)
def get_user(self, username: str) -> tuple[int, dict]:
return self.get(f'user/{username}')
def get_user_from_ip(self, ip):
return self.get("user/from-ip", params={"ip": ip})
def authenticate(self, username, password):
return self.post('user/authenticate',
data={'login': username, 'password': password})
def change_password(self, user_id, old_password, new_password):
return self.post(f'user/{user_id}/change-password',
data={'password': old_password,
'new_password': new_password})
def change_mail(self, user_id, password, new_mail, forwarded):
return self.post(f'user/{user_id}/change-email',
data={'password': password, 'new_email': new_mail, 'forwarded': forwarded})
def change_mac(self, user_id, password, interface_id, new_mac, host_name):
return self.post(f'user/{user_id}/change-mac/{interface_id}',
data={'password': password, 'mac': new_mac, 'host_name': host_name})
def activate_network_access(self, user_id, password, mac, birthdate, host_name):
return self.post(f'user/{user_id}/activate-network-access',
data={'password': password, 'mac': mac,
'birthdate': birthdate, 'host_name': host_name})
def estimate_balance_at_end_of_membership(self, user_id, end_date):
return self.get(f"user/{user_id}/terminate-membership",
params={'end_date': end_date})
def terminate_membership(self, user_id, end_date):
return self.post(f"user/{user_id}/terminate-membership",
data={'end_date': end_date,
'comment': 'Move-out by SIPA'})
def continue_membership(self, user_id):
return self.delete(f"user/{user_id}/terminate-membership")
def reset_wifi_password(self, user_id):
return self.patch(f"user/{user_id}/reset-wifi-password")
def request_password_reset(self, user_ident: str, email: str):
return self.post("user/reset-password", data={
'ident': user_ident,
'email': email,
})
def reset_password(self, token, new_password):
return self.patch("user/reset-password", data={
'token': token,
'password': new_password,
})
def match_person(self, first_name: str, last_name: str, birthdate: date, tenant_number: int,
previous_dorm: str | None) -> MatchPersonResult:
"""
Get the newest tenancy for the supplied user data.
:raises PycroftApiError: if the matching was unsuccessful
:return: the match result
"""
# if first_name == 's':
# status, result = 200, {
# 'building': 'Zw 41',
# 'room': 'Room 407',
# 'room_id': 1337,
# 'begin': '2020-10-01',
# 'end': '2021-10-01',
# }
# else:
# status, result = 404, {
# 'code': 'user_exists',
# 'message': 'No tenancies found for this data',
# }
params = {'first_name': first_name, 'last_name': last_name,
'birthdate': birthdate, 'person_id': tenant_number}
if previous_dorm is not None:
params['previous_dorm'] = previous_dorm
status, result = self.get("register", params)
if status != 200:
raise PycroftApiError(result['code'], result['message'])
return MatchPersonResult.from_json(result)
def member_request(self, email: str, login: str, password: str,
first_name: str, last_name: str, birthdate: date,
move_in_date: date, tenant_number: int | None,
room_id: int | None, previous_dorm: str | None) -> None:
"""
Creates a member request in pycroft.
:raises PycroftApiError: if the member request was unsuccessful
"""
# if login == 's':
# status, result = 200, None
# else:
# status, result = 404, {
# 'code': 'user_exists',
# 'message': 'User already exists',
# }
data = {
'first_name': first_name, 'last_name': last_name, 'birthdate': birthdate,
'email': email, 'login': login, 'password': password,
'move_in_date': move_in_date.isoformat()
}
# Verification was not skipped
if tenant_number is not None:
data['person_id'] = tenant_number
# Room was not rejected
if room_id is not None:
data['room_id'] = room_id
if previous_dorm is not None:
data['previous_dorm'] = previous_dorm
status, result = self.post("register", data=data)
if status != 200:
raise PycroftApiError(result['code'], result['message'])
else:
return
def resend_confirm_email(self, user_id: int) -> bool:
status, _ = self.get("register/confirm", params={'user_id': user_id})
return status == 200
def confirm_email(self, token: str):
"""
Confirms a member request.
:raises PycroftApiError: if the confirmation was unsuccessful
:return: the confirmation type, either `user` or `pre_member`
"""
# if token == 's':
# status, result = 200, None
# else:
# status, result = 404, {
# 'code': 'bad_key',
# 'message': 'Bad key',
# }
status, result = self.post("register/confirm", data={'key': token})
if status != 200:
raise PycroftApiError(result['code'], result['message'])
return result
def get(self, url: t.LiteralString, params=None):
request_function = partial(self.session.get, params=params or {})
return self._do_api_call(request_function, url)
def post(self, url: t.LiteralString, data=None):
request_function = partial(self.session.post, data=data or {})
return self._do_api_call(request_function, url)
def delete(self, url: t.LiteralString, data=None):
request_function = partial(self.session.delete, data=data or {})
return self._do_api_call(request_function, url)
def patch(self, url: t.LiteralString, data=None):
request_function = partial(self.session.patch, data=data or {})
return self._do_api_call(request_function, url)
def _do_api_call(
self, request_function: Callable, url: t.LiteralString
) -> tuple[int, Any]:
try:
response = request_function(self._endpoint + url)
except ConnectionError as e:
logger.error("Caught a ConnectionError when accessing Pycroft API",
extra={'data': {'endpoint': self._endpoint + url}})
raise PycroftBackendError("Pycroft API unreachable") from e
if response.status_code not in [200, 400, 401, 403, 404, 412, 422]:
try:
response.raise_for_status()
except HTTPError as e:
raise PycroftBackendError(f"Pycroft API returned status"
f" {response.status_code}") from e
return response.status_code, response.json()