byceps/byceps

View on GitHub
byceps/services/user/user_domain_service.py

Summary

Maintainability
A
0 mins
Test Coverage
A
94%
"""
byceps.services.user.user_domain_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2014-2024 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from datetime import date, datetime
from typing import Any

from byceps.events.base import EventSite, EventUser
from byceps.events.user import (
    UserAccountCreatedEvent,
    UserAccountDeletedEvent,
    UserAccountSuspendedEvent,
    UserAccountUnsuspendedEvent,
    UserDetailsUpdatedEvent,
    UserEmailAddressChangedEvent,
    UserEmailAddressConfirmedEvent,
    UserEmailAddressInvalidatedEvent,
    UserScreenNameChangedEvent,
)
from byceps.services.site.models import Site
from byceps.util.result import Err, Ok, Result
from byceps.util.uuid import generate_uuid4, generate_uuid7

from .errors import (
    AccountAlreadyInitializedError,
    InvalidEmailAddressError,
    InvalidScreenNameError,
)
from .models.log import UserLogEntry, UserLogEntryData
from .models.user import User, UserEmailAddress, UserID


def create_account(
    screen_name: str | None,
    email_address: str | None,
    password: str,
    *,
    locale: str | None = None,
    creation_method: str | None = None,
    site: Site | None = None,
    ip_address: str | None = None,
    initiator: User | None = None,
) -> Result[
    tuple[User, str | None, UserAccountCreatedEvent, UserLogEntry],
    InvalidScreenNameError | InvalidEmailAddressError,
]:
    """Create a user account."""
    occurred_at = datetime.utcnow()
    user_id = UserID(generate_uuid4())

    normalized_screen_name: str | None
    if screen_name is not None:
        screen_name_normalization_result = normalize_screen_name(screen_name)

        if screen_name_normalization_result.is_err():
            return Err(screen_name_normalization_result.unwrap_err())

        normalized_screen_name = screen_name_normalization_result.unwrap()
    else:
        normalized_screen_name = None

    normalized_email_address: str | None
    if email_address is not None:
        email_address_normalization_result = normalize_email_address(
            email_address
        )

        if email_address_normalization_result.is_err():
            return Err(email_address_normalization_result.unwrap_err())

        normalized_email_address = email_address_normalization_result.unwrap()
    else:
        normalized_email_address = None

    user = User(
        id=user_id,
        screen_name=normalized_screen_name,
        initialized=False,
        suspended=False,
        deleted=False,
        locale=locale,
        avatar_url=None,
    )

    event = _build_account_created_event(occurred_at, initiator, user, site)

    log_entry = _build_account_created_log_entry(
        occurred_at, initiator, user, creation_method, site, ip_address
    )

    return Ok((user, normalized_email_address, event, log_entry))


def _build_account_created_event(
    occurred_at: datetime,
    initiator: User | None,
    user: User,
    site: Site | None = None,
) -> UserAccountCreatedEvent:
    return UserAccountCreatedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator) if initiator else None,
        user=EventUser.from_user(user),
        site=EventSite.from_site(site) if site else None,
    )


def _build_account_created_log_entry(
    occurred_at: datetime,
    initiator: User | None,
    user: User,
    creation_method: str | None,
    site: Site | None,
    ip_address: str | None,
) -> UserLogEntry:
    data = {}

    if initiator is not None:
        data['initiator_id'] = str(initiator.id)

    if creation_method:
        data['creation_method'] = creation_method

    if site:
        data['site_id'] = site.id

    if ip_address:
        data['ip_address'] = ip_address

    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-created',
        user_id=user.id,
        initiator_id=initiator.id if initiator else None,
        data=data,
    )


def initialize_account(
    user: User,
    *,
    initiator: User | None = None,
) -> Result[UserLogEntry, AccountAlreadyInitializedError]:
    """Initialize the user account."""
    if user.initialized:
        return Err(AccountAlreadyInitializedError())

    occurred_at = datetime.utcnow()

    log_entry = _build_account_initialized_log_entry(
        occurred_at, initiator, user
    )

    return Ok(log_entry)


def _build_account_initialized_log_entry(
    occurred_at: datetime, initiator: User | None, user: User
) -> UserLogEntry:
    data = {}

    if initiator:
        data['initiator_id'] = str(initiator.id)

    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-initialized',
        user_id=user.id,
        initiator_id=initiator.id if initiator else None,
        data=data,
    )


def suspend_account(
    user: User, initiator: User, reason: str
) -> tuple[UserAccountSuspendedEvent, UserLogEntry]:
    """Suspend the user account."""
    occurred_at = datetime.utcnow()

    event = _build_account_suspended_event(occurred_at, initiator, user)

    log_entry = _build_account_suspended_log_entry(
        occurred_at, initiator, user, reason
    )

    return event, log_entry


def _build_account_suspended_event(
    occurred_at: datetime, initiator: User, user: User
) -> UserAccountSuspendedEvent:
    return UserAccountSuspendedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator),
        user=EventUser.from_user(user),
    )


def _build_account_suspended_log_entry(
    occurred_at: datetime, initiator: User, user: User, reason: str
) -> UserLogEntry:
    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-suspended',
        user_id=user.id,
        initiator_id=initiator.id,
        data={
            'initiator_id': str(initiator.id),
            'reason': reason,
        },
    )


def unsuspend_account(
    user: User, initiator: User, reason: str
) -> tuple[UserAccountUnsuspendedEvent, UserLogEntry]:
    """Unsuspend the user account."""
    occurred_at = datetime.utcnow()

    event = _build_account_unsuspended_event(occurred_at, initiator, user)

    log_entry = _build_account_unsuspended_log_entry(
        occurred_at, initiator, user, reason
    )

    return event, log_entry


def _build_account_unsuspended_event(
    occurred_at: datetime, initiator: User, user: User
) -> UserAccountUnsuspendedEvent:
    return UserAccountUnsuspendedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator),
        user=EventUser.from_user(user),
    )


def _build_account_unsuspended_log_entry(
    occurred_at: datetime, initiator: User, user: User, reason: str
) -> UserLogEntry:
    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-unsuspended',
        user_id=user.id,
        initiator_id=initiator.id,
        data={
            'initiator_id': str(initiator.id),
            'reason': reason,
        },
    )


def delete_account(
    user: User,
    initiator: User,
    reason: str,
) -> tuple[UserAccountDeletedEvent, UserLogEntry]:
    """Delete the user account."""
    occurred_at = datetime.utcnow()

    event = _build_account_deleted_event(occurred_at, initiator, user)

    log_entry = _build_account_deleted_log_entry(
        occurred_at, initiator, user, reason
    )

    return event, log_entry


def _build_account_deleted_event(
    occurred_at: datetime,
    initiator: User,
    user: User,
) -> UserAccountDeletedEvent:
    return UserAccountDeletedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator),
        user=EventUser.from_user(user),
    )


def _build_account_deleted_log_entry(
    occurred_at: datetime,
    initiator: User,
    user: User,
    reason: str,
) -> UserLogEntry:
    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-deleted',
        user_id=user.id,
        initiator_id=initiator.id,
        data={
            'initiator_id': str(initiator.id),
            'reason': reason,
        },
    )


def change_screen_name(
    user: User,
    new_screen_name: str,
    initiator: User,
    *,
    reason: str | None = None,
) -> tuple[UserScreenNameChangedEvent, UserLogEntry]:
    """Change the user's screen name."""
    occurred_at = datetime.utcnow()
    old_screen_name = user.screen_name

    event = _build_screen_name_changed_event(
        occurred_at, initiator, user, old_screen_name, new_screen_name
    )

    log_entry = _build_screen_name_changed_log_entry(
        occurred_at, initiator, user, old_screen_name, new_screen_name, reason
    )

    return event, log_entry


def _build_screen_name_changed_event(
    occurred_at: datetime,
    initiator: User,
    user: User,
    old_screen_name: str | None,
    new_screen_name: str | None,
) -> UserScreenNameChangedEvent:
    return UserScreenNameChangedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator),
        user_id=user.id,
        old_screen_name=old_screen_name,
        new_screen_name=new_screen_name,
    )


def _build_screen_name_changed_log_entry(
    occurred_at: datetime,
    initiator: User,
    user: User,
    old_screen_name: str | None,
    new_screen_name: str | None,
    reason: str | None,
) -> UserLogEntry:
    data = {
        'initiator_id': str(initiator.id),
        'old_screen_name': old_screen_name,
        'new_screen_name': new_screen_name,
    }

    if reason:
        data['reason'] = reason

    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-screen-name-changed',
        user_id=user.id,
        initiator_id=initiator.id,
        data=data,
    )


def change_email_address(
    user: User,
    old_email_address: str | None,
    new_email_address: str | None,
    verified: bool,
    initiator: User,
    *,
    reason: str | None = None,
) -> tuple[UserEmailAddressChangedEvent, UserLogEntry]:
    """Change the user's e-mail address."""
    occurred_at = datetime.utcnow()

    event = _build_email_address_changed_event(occurred_at, initiator, user)

    log_entry = _build_email_address_changed_log_entry(
        occurred_at,
        initiator,
        user,
        old_email_address,
        new_email_address,
        reason,
    )

    return event, log_entry


def _build_email_address_changed_event(
    occurred_at: datetime,
    initiator: User,
    user: User,
) -> UserEmailAddressChangedEvent:
    return UserEmailAddressChangedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator),
        user=EventUser.from_user(user),
    )


def _build_email_address_changed_log_entry(
    occurred_at: datetime,
    initiator: User,
    user: User,
    old_email_address: str | None,
    new_email_address: str | None,
    reason: str | None,
) -> UserLogEntry:
    data = {
        'initiator_id': str(initiator.id),
        'old_email_address': old_email_address,
        'new_email_address': new_email_address,
    }

    if reason:
        data['reason'] = reason

    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-email-address-changed',
        user_id=user.id,
        initiator_id=initiator.id,
        data=data,
    )


def confirm_email_address(
    user: User,
    current_email_address: UserEmailAddress,
    email_address_to_confirm: str,
) -> Result[tuple[UserEmailAddressConfirmedEvent, UserLogEntry], str]:
    """Confirm the e-mail address of the user account."""
    if current_email_address.address is None:
        return Err('Account has no email address assigned.')

    if current_email_address.address != email_address_to_confirm:
        return Err('Email addresses do not match.')

    if current_email_address.verified:
        return Err('Email address is already verified.')

    occurred_at = datetime.utcnow()

    event = _build_email_address_confirmed_event(occurred_at, user)

    log_entry = _build_email_address_confirmed_log_entry(
        occurred_at, user, email_address_to_confirm
    )

    return Ok((event, log_entry))


def _build_email_address_confirmed_event(
    occurred_at: datetime,
    user: User,
) -> UserEmailAddressConfirmedEvent:
    return UserEmailAddressConfirmedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(user),
        user=EventUser.from_user(user),
    )


def _build_email_address_confirmed_log_entry(
    occurred_at: datetime,
    user: User,
    email_address: str,
) -> UserLogEntry:
    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-email-address-confirmed',
        user_id=user.id,
        initiator_id=user.id,
        data={'email_address': email_address},
    )


def invalidate_email_address(
    user: User,
    email_address: UserEmailAddress,
    reason: str,
    *,
    initiator: User | None = None,
) -> Result[tuple[UserEmailAddressInvalidatedEvent, UserLogEntry], str]:
    """Invalidate the user's email address."""
    if email_address.address is None:
        return Err('Account has no email address assigned.')

    if not email_address.verified:
        return Err('Email address is not verified.')

    occurred_at = datetime.utcnow()

    event = _build_email_address_invalidated_event(occurred_at, initiator, user)

    log_entry = _build_email_address_invalidated_log_entry(
        occurred_at, initiator, user, email_address.address, reason
    )

    return Ok((event, log_entry))


def _build_email_address_invalidated_event(
    occurred_at: datetime,
    initiator: User | None,
    user: User,
) -> UserEmailAddressInvalidatedEvent:
    return UserEmailAddressInvalidatedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator) if initiator else None,
        user=EventUser.from_user(user),
    )


def _build_email_address_invalidated_log_entry(
    occurred_at: datetime,
    initiator: User | None,
    user: User,
    email_address: str | None,
    reason: str,
) -> UserLogEntry:
    data = {
        'email_address': email_address,
        'reason': reason,
    }

    if initiator:
        data['initiator_id'] = str(initiator.id)

    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-email-address-invalidated',
        user_id=user.id,
        initiator_id=initiator.id if initiator else None,
        data=data,
    )


def update_details(
    user: User,
    old_first_name: str | None,
    new_first_name: str | None,
    old_last_name: str | None,
    new_last_name: str | None,
    old_date_of_birth: date | None,
    new_date_of_birth: date | None,
    old_country: str | None,
    new_country: str | None,
    old_zip_code: str | None,
    new_zip_code: str | None,
    old_city: str | None,
    new_city: str | None,
    old_street: str | None,
    new_street: str | None,
    old_phone_number: str | None,
    new_phone_number: str | None,
    initiator: User,
) -> tuple[UserDetailsUpdatedEvent, UserLogEntry]:
    """Update the user's details."""
    occurred_at = datetime.utcnow()

    event = _build_details_updated_event(occurred_at, initiator, user)

    log_entry = _build_details_updated_log_entry(
        occurred_at,
        initiator,
        user,
        old_first_name,
        new_first_name,
        old_last_name,
        new_last_name,
        old_date_of_birth,
        new_date_of_birth,
        old_country,
        new_country,
        old_zip_code,
        new_zip_code,
        old_city,
        new_city,
        old_street,
        new_street,
        old_phone_number,
        new_phone_number,
    )

    return event, log_entry


def _build_details_updated_event(
    occurred_at: datetime,
    initiator: User,
    user: User,
) -> UserDetailsUpdatedEvent:
    return UserDetailsUpdatedEvent(
        occurred_at=occurred_at,
        initiator=EventUser.from_user(initiator),
        user=EventUser.from_user(user),
    )


def _build_details_updated_log_entry(
    occurred_at: datetime,
    initiator: User,
    user: User,
    old_first_name: str | None,
    new_first_name: str | None,
    old_last_name: str | None,
    new_last_name: str | None,
    old_date_of_birth: date | None,
    new_date_of_birth: date | None,
    old_country: str | None,
    new_country: str | None,
    old_zip_code: str | None,
    new_zip_code: str | None,
    old_city: str | None,
    new_city: str | None,
    old_street: str | None,
    new_street: str | None,
    old_phone_number: str | None,
    new_phone_number: str | None,
) -> UserLogEntry:
    data = {
        'initiator_id': str(initiator.id),
    }

    _add_if_different(data, 'first_name', old_first_name, new_first_name)
    _add_if_different(data, 'last_name', old_last_name, new_last_name)
    _add_if_different(
        data, 'date_of_birth', old_date_of_birth, new_date_of_birth
    )
    _add_if_different(data, 'country', old_country, new_country)
    _add_if_different(data, 'zip_code', old_zip_code, new_zip_code)
    _add_if_different(data, 'city', old_city, new_city)
    _add_if_different(data, 'street', old_street, new_street)
    _add_if_different(data, 'phone_number', old_phone_number, new_phone_number)

    return UserLogEntry(
        id=generate_uuid7(),
        occurred_at=occurred_at,
        event_type='user-details-updated',
        user_id=user.id,
        initiator_id=initiator.id,
        data=data,
    )


def _add_if_different(
    log_entry_data: UserLogEntryData,
    base_key_name: str,
    old_value: Any | None,
    new_value: Any | None,
) -> None:
    if old_value != new_value:
        log_entry_data[f'old_{base_key_name}'] = _to_str_if_not_none(old_value)
        log_entry_data[f'new_{base_key_name}'] = _to_str_if_not_none(new_value)


def _to_str_if_not_none(value: Any) -> str | None:
    return str(value) if (value is not None) else None


def normalize_screen_name(
    screen_name: str,
) -> Result[str, InvalidScreenNameError]:
    """Normalize the screen name."""
    normalized = screen_name.strip()

    if not normalized or (' ' in normalized) or ('@' in normalized):
        return Err(InvalidScreenNameError(value=screen_name))

    return Ok(normalized)


def normalize_email_address(
    email_address: str,
) -> Result[str, InvalidEmailAddressError]:
    """Normalize the e-mail address."""
    normalized = email_address.strip().lower()

    if not normalized or (' ' in normalized) or ('@' not in normalized):
        return Err(InvalidEmailAddressError(value=email_address))

    return Ok(normalized)