byceps/byceps

View on GitHub
byceps/blueprints/admin/page/views.py

Summary

Maintainability
A
0 mins
Test Coverage
F
58%
"""
byceps.blueprints.admin.page.views
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2014-2024 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from flask import abort, g, request, url_for
from flask_babel import format_datetime, gettext

from byceps.blueprints.site.page.templating import build_template_context
from byceps.services.page import page_service
from byceps.services.page.errors import (
    PageAlreadyExistsError,
    PageNotFoundError,
)
from byceps.services.page.models import Page, PageVersion, PageVersionID
from byceps.services.site import site_service
from byceps.services.site.models import Site, SiteID
from byceps.services.site_navigation import site_navigation_service
from byceps.services.text_diff import text_diff_service
from byceps.services.user import user_service
from byceps.signals import page as page_signals
from byceps.util.framework.blueprint import create_blueprint
from byceps.util.framework.flash import flash_error, flash_success
from byceps.util.framework.templating import templated
from byceps.util.iterables import pairwise
from byceps.util.result import Err, Ok
from byceps.util.views import (
    permission_required,
    redirect_to,
    respond_no_content_with_location,
)

from .forms import CopyPagesForm, CreateForm, SetNavMenuForm, UpdateForm


blueprint = create_blueprint('page_admin', __name__)


@blueprint.get('/for_site/<site_id>')
@permission_required('page.view')
@templated
def index_for_site(site_id):
    """List pages for that site."""
    site = _get_site(site_id)

    pages = page_service.get_pages_for_site_with_current_versions(site.id)

    user_ids = {page.current_version.creator_id for page in pages}
    users_by_id = user_service.get_users_indexed_by_id(
        user_ids, include_avatars=True
    )

    return {
        'pages': pages,
        'users_by_id': users_by_id,
        'site': site,
    }


@blueprint.get('/pages/<uuid:page_id>/current_version')
@permission_required('page.view')
def view_current_version(page_id):
    """Show the current version of the page."""
    current_version_id = page_service.find_current_version_id(page_id)

    if current_version_id is None:
        abort(404)

    return view_version(current_version_id)


@blueprint.get('/versions/<uuid:version_id>')
@permission_required('page.view_history')
@templated
def view_version(version_id):
    """Show the page with the given id."""
    version = _get_version(version_id)

    page = page_service.get_page(version.page_id)
    site = site_service.get_site(page.site_id)
    creator = user_service.get_user(version.creator_id, include_avatar=True)
    is_current_version = page_service.is_current_version(page.id, version.id)

    if page.nav_menu_id:
        nav_menu = site_navigation_service.get_menu(page.nav_menu_id).unwrap()
    else:
        nav_menu = None

    return {
        'page': page,
        'site': site,
        'version': version,
        'creator': creator,
        'is_current_version': is_current_version,
        'nav_menu': nav_menu,
    }


@blueprint.get('/versions/<uuid:version_id>/preview')
@permission_required('page.view_history')
@templated
def view_version_preview(version_id):
    """Show a preview of the page version."""
    version = _get_version(version_id)

    try:
        template_context = build_template_context(
            version.title, version.head, version.body
        )

        return {
            'title': template_context['page_title'],
            'head': template_context['head'],
            'body': template_context['body'],
            'error_occurred': False,
        }
    except Exception as e:
        return {
            'error_occurred': True,
            'error_message': str(e),
        }


@blueprint.get('/pages/<uuid:page_id>/history')
@permission_required('page.view_history')
@templated
def history(page_id):
    """Show index of page versions."""
    page = _get_page(page_id)

    versions = page_service.get_versions(page.id)
    versions_pairwise = list(pairwise(versions + [None]))

    user_ids = {version.creator_id for version in versions}
    users_by_id = user_service.get_users_indexed_by_id(
        user_ids, include_avatars=True
    )

    site = site_service.get_site(page.site_id)

    return {
        'page': page,
        'versions_pairwise': versions_pairwise,
        'users_by_id': users_by_id,
        'site': site,
    }


@blueprint.get(
    '/versions/<uuid:from_version_id>/compare_to/<uuid:to_version_id>'
)
@permission_required('page.view_history')
@templated
def compare_versions(from_version_id, to_version_id):
    """Show the difference between two versions."""
    from_version = _get_version(from_version_id)
    to_version = _get_version(to_version_id)

    if from_version.page_id != to_version.page_id:
        abort(400, 'The versions do not belong to the same page.')

    html_diff_title = _create_html_diff(from_version, to_version, 'title')
    html_diff_head = _create_html_diff(from_version, to_version, 'head')
    html_diff_body = _create_html_diff(from_version, to_version, 'body')

    page = page_service.get_page(from_version.page_id)
    site = site_service.get_site(page.site_id)

    return {
        'page': page,
        'diff_title': html_diff_title,
        'diff_head': html_diff_head,
        'diff_body': html_diff_body,
        'site': site,
    }


def _create_html_diff(
    from_version: PageVersion,
    to_version: PageVersion,
    attribute_name: str,
) -> str | None:
    """Create an HTML diff between the named attribute's value of each
    of the two versions.
    """
    from_description = format_datetime(from_version.created_at)
    to_description = format_datetime(to_version.created_at)

    from_text = getattr(from_version, attribute_name)
    to_text = getattr(to_version, attribute_name)

    return text_diff_service.create_html_diff(
        from_text, to_text, from_description, to_description
    )


@blueprint.get('/for_site/<target_site_id>/copy')
@permission_required('page.create')
@templated
def copy_select_source_site_form(target_site_id):
    """Show form to select a site to copy pages from."""
    target_site = _get_site(target_site_id)

    source_sites = [
        site
        for site in site_service.get_sites_for_brand(target_site.brand_id)
        if site.id != target_site.id
    ]
    source_sites.sort(key=lambda site: site.title, reverse=True)

    return {
        'site': target_site,
        'target_site': target_site,
        'source_sites': source_sites,
    }


@blueprint.get('/for_site/<target_site_id>/copy/from_site/<source_site_id>')
@permission_required('page.create')
@templated
def copy_form(target_site_id, source_site_id, erroneous_form=None):
    """Show form to select pages to copy from another site."""
    source_site = _get_site(source_site_id)
    target_site = _get_site(target_site_id)

    pages = page_service.get_pages_for_site(source_site_id)
    if not pages:
        flash_error('No pages exist for this site.')
        return redirect_to(
            '.copy_select_source_site_form', target_site_id=target_site.id
        )

    form = erroneous_form if erroneous_form else CopyPagesForm()
    form.set_source_page_id_choices(pages)

    return {
        'form': form,
        'site': target_site,
        'target_site': target_site,
        'source_site': source_site,
    }


@blueprint.post('/for_site/<target_site_id>/copy/from_site/<source_site_id>')
@permission_required('page.create')
def copy(target_site_id, source_site_id):
    """Copy pages from another site."""
    source_site = _get_site(source_site_id)
    target_site = _get_site(target_site_id)

    pages = page_service.get_pages_for_site(source_site_id)
    if not pages:
        flash_error('No pages exist for this site.')
        return redirect_to(
            '.copy_select_source_site_form', target_site_id=target_site.id
        )

    form = CopyPagesForm(request.form)
    form.set_source_page_id_choices(pages)

    if not form.validate():
        return copy_form(target_site.id, source_site.id, form)

    source_pages = [
        page_service.get_page(page_id) for page_id in form.source_page_ids.data
    ]
    for page in source_pages:
        result = page_service.copy_page(
            source_site, target_site, page.name, page.language_code
        )
        match result:
            case Ok((_, event)):
                flash_success(
                    gettext(
                        'Page "%(name)s" (%(language_code)s) has been copied.',
                        name=page.name,
                        language_code=page.language_code,
                    )
                )
                page_signals.page_created.send(None, event=event)
            case Err(PageNotFoundError()):
                flash_error(
                    gettext(
                        'Page "%(name)s" (%(language_code)s) was not found in site "%(source_site_title)s".',
                        name=page.name,
                        language_code=page.language_code,
                        source_site_title=source_site.title,
                    )
                )
            case Err(PageAlreadyExistsError()):
                flash_error(
                    gettext(
                        'Page "%(name)s" (%(language_code)s) already exists in site "%(target_site_title)s".',
                        name=page.name,
                        language_code=page.language_code,
                        target_site_title=target_site.title,
                    )
                )

    return redirect_to('.index_for_site', site_id=target_site.id)


@blueprint.get('/for_site/<site_id>/create')
@permission_required('page.create')
@templated
def create_form(site_id, erroneous_form=None):
    """Show form to create a page."""
    site = _get_site(site_id)

    form = erroneous_form if erroneous_form else CreateForm()
    form.set_language_code_choices()

    return {
        'form': form,
        'site': site,
    }


@blueprint.post('/for_site/<site_id>')
@permission_required('page.create')
def create(site_id):
    """Create a page."""
    site = _get_site(site_id)

    form = CreateForm(request.form)
    form.set_language_code_choices()

    if not form.validate():
        return create_form(site.id, form)

    name = form.name.data.strip().lower()
    language_code = form.language_code.data
    url_path = form.url_path.data.strip()
    creator = g.user
    title = form.title.data.strip()
    head = form.head.data.strip()
    body = form.body.data.strip()

    version, event = page_service.create_page(
        site,
        name,
        language_code,
        url_path,
        creator,
        title,
        body,
        head=head,
    )

    flash_success(gettext('Page has been created.'))

    page_signals.page_created.send(None, event=event)

    return redirect_to('.view_version', version_id=version.id)


@blueprint.get('/pages/<uuid:page_id>/update')
@permission_required('page.update')
@templated
def update_form(page_id, erroneous_form=None):
    """Show form to update a page."""
    page = _get_page(page_id)

    current_version_id = page_service.find_current_version_id(page.id)
    page_aggregate = page_service.find_page_aggregate(current_version_id)

    form = erroneous_form if erroneous_form else UpdateForm(obj=page_aggregate)
    form.set_language_code_choices()

    site = site_service.get_site(page.site_id)

    return {
        'form': form,
        'page': page,
        'site': site,
    }


@blueprint.post('/pages/<uuid:page_id>')
@permission_required('page.update')
def update(page_id):
    """Update a page."""
    page = _get_page(page_id)

    form = UpdateForm(request.form)
    form.set_language_code_choices()

    if not form.validate():
        return update_form(page.id, form)

    language_code = form.language_code.data
    url_path = form.url_path.data.strip()
    creator = g.user
    title = form.title.data.strip()
    head = form.head.data.strip()
    body = form.body.data.strip()

    version, event = page_service.update_page(
        page.id,
        language_code,
        url_path,
        creator,
        title,
        head,
        body,
    )

    flash_success(gettext('Page has been updated.'))

    page_signals.page_updated.send(None, event=event)

    return redirect_to('.view_version', version_id=version.id)


@blueprint.delete('/pages/<uuid:page_id>')
@permission_required('page.delete')
@respond_no_content_with_location
def delete(page_id):
    """Delete a page."""
    page = _get_page(page_id)

    page_name = page.name
    site_id = page.site_id

    success, event = page_service.delete_page(page.id, initiator=g.user)

    if not success:
        flash_error(
            gettext('Page "%(name)s" could not be deleted.', name=page_name)
        )
        return url_for('.view_current_version', page_id=page.id)

    flash_success(gettext('Page "%(name)s" has been deleted.', name=page_name))

    page_signals.page_deleted.send(None, event=event)

    return url_for('.index_for_site', site_id=site_id)


@blueprint.get('/pages/<uuid:page_id>/set_nav_menu')
@permission_required('page.update')
@templated
def set_nav_menu_form(page_id, erroneous_form=None):
    """Show form to set navigation menu for a page."""
    page = _get_page(page_id)

    site = site_service.get_site(page.site_id)

    form = erroneous_form if erroneous_form else SetNavMenuForm(obj=page)
    form.set_nav_menu_choices(site.id)

    return {
        'form': form,
        'page': page,
        'site': site,
    }


@blueprint.post('/pages/<uuid:page_id>/set_nav_menu')
@permission_required('page.update')
def set_nav_menu(page_id):
    """Set navigation menu for a page."""
    page = _get_page(page_id)

    site = site_service.get_site(page.site_id)

    form = SetNavMenuForm(request.form)
    form.set_nav_menu_choices(site.id)

    if not form.validate():
        return set_nav_menu_form(page.id, form)

    nav_menu_id = form.nav_menu_id.data or None

    page_service.set_nav_menu_id(page.id, nav_menu_id)

    flash_success(gettext('Page has been updated.'))

    return redirect_to('.view_current_version', page_id=page.id)


def _get_site(site_id) -> Site:
    site = site_service.find_site(SiteID(site_id))

    if site is None:
        abort(404)

    return site


def _get_page(page_id) -> Page:
    page = page_service.find_page(page_id)

    if page is None:
        abort(404)

    return page


def _get_version(version_id: PageVersionID) -> PageVersion:
    version = page_service.find_version(version_id)

    if version is None:
        abort(404)

    return version