digitalfabrik/integreat-cms

View on GitHub
integreat_cms/cms/views/pages/page_actions.py

Summary

Maintainability
A
0 mins
Test Coverage
D
69%
"""
This module contains view actions related to pages.
"""

from __future__ import annotations

import logging
import os
import time
import uuid
from typing import TYPE_CHECKING

from db_mutex import DBMutexError, DBMutexTimeoutError
from django.conf import settings
from django.contrib import messages
from django.core.exceptions import PermissionDenied
from django.db import transaction
from django.http import (
    Http404,
    HttpResponse,
    HttpResponseNotFound,
    HttpResponseRedirect,
    JsonResponse,
)
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.translation import gettext_lazy as _
from django.views.decorators.http import require_POST
from treebeard.exceptions import InvalidMoveToDescendant, InvalidPosition

from ....api.decorators import json_response
from ...constants import text_directions
from ...decorators import permission_required
from ...forms import PageForm
from ...models import Page, PageTranslation, Region
from ...utils.file_utils import extract_zip_archive
from ...utils.repair_tree import repair_tree

if TYPE_CHECKING:
    from django.http import HttpRequest

logger = logging.getLogger(__name__)


@require_POST
def archive_page(
    request: HttpRequest, page_id: int, region_slug: str, language_slug: str
) -> HttpResponseRedirect:
    """
    Archive page object

    :param request: The current request
    :param page_id: The id of the page which should be archived
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :raises ~django.core.exceptions.PermissionDenied: If user does not have the permission to edit the specific page

    :return: A redirection to the :class:`~integreat_cms.cms.views.pages.page_tree_view.PageTreeView`
    """
    region = request.region
    page = get_object_or_404(region.pages, id=page_id)

    if not request.user.has_perm("cms.change_page_object", page):
        raise PermissionDenied(
            f"{request.user!r} does not have the permission to archive {page!r}"
        )

    if page.mirroring_pages.exists():
        messages.error(
            request,
            _(
                "This page cannot be archived because it was embedded as live content from another page."
            ),
        )
    else:
        page.archive()

        logger.debug("%r archived by %r", page, request.user)
        messages.success(request, _("Page was successfully archived"))

    return redirect(
        "pages",
        **{
            "region_slug": region_slug,
            "language_slug": language_slug,
        },
    )


@require_POST
def restore_page(
    request: HttpRequest, page_id: int, region_slug: str, language_slug: str
) -> HttpResponseRedirect:
    """
    Restore page object (set ``archived=False``)

    :param request: The current request
    :param page_id: The id of the page which should be restored
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :raises ~django.core.exceptions.PermissionDenied: If user does not have the permission to edit the specific page

    :return: A redirection to the :class:`~integreat_cms.cms.views.pages.page_tree_view.PageTreeView`
    """

    region = request.region
    page = get_object_or_404(region.pages, id=page_id)

    if not request.user.has_perm("cms.change_page_object", page):
        raise PermissionDenied(
            f"{request.user!r} does not have the permission to restore {page!r}"
        )

    page.restore()

    if page.implicitly_archived:
        logger.debug(
            "%r restored by %r but still implicitly archived",
            page,
            request.user,
        )
        messages.info(
            request,
            _("Page was successfully restored.")
            + " "
            + _(
                "However, it is still archived because one of its parent pages is archived."
            ),
        )
        return redirect(
            "archived_pages",
            **{
                "region_slug": region_slug,
                "language_slug": language_slug,
            },
        )

    logger.debug("%r restored by %r", page, request.user)
    messages.success(request, _("Page was successfully restored."))
    return redirect(
        "pages",
        **{
            "region_slug": region_slug,
            "language_slug": language_slug,
        },
    )


@permission_required("cms.view_page")
@json_response
# pylint: disable=unused-argument
def preview_page_ajax(
    request: HttpRequest, page_id: int, region_slug: str, language_slug: str
) -> JsonResponse:
    """
    Preview page object

    :param request: The current request
    :param page_id: The id of the page which should be viewed
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :raises ~django.http.Http404: HTTP status 404 if page translation does not exist

    :return: Significant page data as a JSON.
    """
    region = request.region
    page = get_object_or_404(region.pages, id=page_id)

    if page_translation := page.get_translation(language_slug):
        mirrored_translation = page.get_mirrored_page_translation(language_slug)
        return JsonResponse(
            data={
                "title": page_translation.title,
                "page_translation": page_translation.content,
                "mirrored_translation": (
                    mirrored_translation.content if mirrored_translation else ""
                ),
                "mirrored_page_first": page.mirrored_page_first,
                "right_to_left": (
                    page_translation.language.text_direction
                    == text_directions.RIGHT_TO_LEFT
                    if page_translation
                    else False
                ),
            }
        )
    raise Http404("Translation of the given page could not be found")


@permission_required("cms.view_page")
@json_response
# pylint: disable=unused-argument
def get_page_content_ajax(
    request: HttpRequest, region_slug: str, language_slug: str, page_id: int
) -> JsonResponse:
    """
    Get content of a page translation based on language slug

    :param request: The current request
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :param page_id: The id of the page which should be viewed
    :raises ~django.http.Http404: HTTP status 404 if page translation does not exist

    :return: Page translation content as a JSON.
    """
    region = request.region
    page = get_object_or_404(region.pages, id=page_id)
    if page_translation := page.get_translation(language_slug):
        return JsonResponse(data={"content": page_translation.content})
    raise Http404("Translation of the given page could not be found")


@require_POST
@permission_required("cms.delete_page")
@transaction.atomic
def delete_page(
    request: HttpRequest, page_id: int, region_slug: str, language_slug: str
) -> HttpResponseRedirect:
    """
    Delete page object

    :param request: The current request
    :param page_id: The id of the page which should be deleted
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :return: A redirection to the :class:`~integreat_cms.cms.views.pages.page_tree_view.PageTreeView`
    """

    region = request.region
    page = get_object_or_404(region.pages, id=page_id)

    if page.children.exists():
        messages.error(request, _("You cannot delete a page which has subpages."))
    elif page.mirroring_pages.exists():
        messages.error(
            request,
            _(
                "This page cannot be deleted because it was embedded as live content from another page."
            ),
        )
    else:
        logger.info("%r deleted by %r", page, request.user)
        page.delete()
        messages.success(request, _("Page was successfully deleted"))

    return redirect(
        "pages",
        **{
            "region_slug": region_slug,
            "language_slug": language_slug,
        },
    )


def expand_page_translation_id(
    request: HttpRequest, short_url_id: int
) -> HttpResponseRedirect:
    """
    Searches for a page translation with corresponding ID and redirects browser to web app

    :param request: The current request
    :param short_url_id: The id of the requested page
    :return: A redirection to :class:`~integreat_cms.core.settings.WEBAPP_URL`
    """

    page_translation = PageTranslation.objects.get(id=short_url_id).public_version

    if page_translation and not page_translation.page.archived:
        return redirect(settings.WEBAPP_URL + page_translation.get_absolute_url())
    return HttpResponseNotFound("<h1>Page not found</h1>")


@require_POST
@permission_required("cms.change_page")
@json_response
# pylint: disable=unused-argument
def cancel_translation_process_ajax(
    request: HttpRequest, region_slug: str, language_slug: str, page_id: int
) -> JsonResponse:
    """
    This view is called for manually unsetting the translation process

    :param request: ajax request
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :param page_id: The id of the requested page
    :return: on success returns language of updated translation
    """
    region = request.region
    page = get_object_or_404(region.pages, id=page_id)
    if not (page_translation := page.get_translation(language_slug)):
        return JsonResponse(
            {
                "error": _(
                    'Page "{}" does not have a translation for language "{}"'
                ).format(page, language_slug)
            },
            status=404,
        )
    if settings.REDIS_CACHE:
        page_translation.all_versions.invalidated_update(currently_in_translation=False)
    else:
        page_translation.all_versions.update(currently_in_translation=False)
    # Get new (respectively old) translation state
    translation_state = page.get_translation_state(language_slug)
    return JsonResponse(
        {
            "success": _(
                'Cancelled translation process for page "{}" and language "{}"'
            ).format(page, page_translation.language),
            "languageSlug": page_translation.language.slug,
            "translationState": translation_state,
        }
    )


@require_POST
@permission_required("cms.change_page")
def upload_xliff(
    request: HttpRequest, region_slug: str, language_slug: str
) -> HttpResponseRedirect:
    """
    Upload and import an XLIFF file

    :param request: The current request
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :return: A redirection to the :class:`~integreat_cms.cms.views.pages.page_tree_view.PageTreeView`
    """
    xliff_paths = []
    upload_files = request.FILES.getlist("xliff_file")
    xliff_dir_uuid = str(uuid.uuid4())
    if upload_files:
        logger.debug("Uploaded files: %r", upload_files)
        upload_dir = os.path.join(settings.XLIFF_UPLOAD_DIR, xliff_dir_uuid)
        os.makedirs(upload_dir, exist_ok=True)
        for upload_file in upload_files:
            # Check whether the file is valid
            if not upload_file.name.endswith((".zip", ".xliff", ".xlf")):
                # File type not supported
                messages.error(
                    request,
                    _('File "{}" is neither a ZIP archive nor an XLIFF file.').format(
                        upload_file.name
                    ),
                )
                logger.warning(
                    "%r tried to import the file %r which is neither a ZIP archive nor an XLIFF file",
                    request.user,
                    upload_file.name,
                )
                continue

            # Copy uploaded file from its temporary location into the upload directory
            with open(os.path.join(upload_dir, upload_file.name), "wb+") as file_write:
                # Using chunks() instead of read() ensures that large files don’t overwhelm your system’s memory
                for chunk in upload_file.chunks():
                    file_write.write(chunk)

            if upload_file.name.endswith(".zip"):
                # Extract zip archive
                xliff_paths_tmp, invalid_file_paths = extract_zip_archive(
                    os.path.join(upload_dir, upload_file.name), upload_dir
                )
                # Append contents of zip archive to total list of xliff files
                xliff_paths += xliff_paths_tmp
                if not xliff_paths_tmp:
                    messages.error(
                        request,
                        _('The ZIP archive "{}" does not contain XLIFF files.').format(
                            upload_file.name
                        ),
                    )
                elif invalid_file_paths:
                    messages.warning(
                        request,
                        _(
                            'The ZIP archive "{}" contains the following invalid files: "{}"'
                        ).format(upload_file.name, '", "'.join(invalid_file_paths)),
                    )
            else:
                # If file is an xliff file, directly append it to the paths
                xliff_paths.append(os.path.join(upload_dir, upload_file.name))
        # Check if at least one file was uploaded successfully
        if xliff_paths:
            logger.info(
                "XLIFF files %r uploaded into %r by %r",
                xliff_paths,
                upload_dir,
                request.user,
            )
            return redirect(
                "import_xliff",
                **{
                    "region_slug": region_slug,
                    "language_slug": language_slug,
                    "xliff_dir": xliff_dir_uuid,
                },
            )
    else:
        messages.error(
            request,
            _("No XLIFF file was selected for import."),
        )

    return redirect(
        "pages",
        **{
            "region_slug": region_slug,
            "language_slug": language_slug,
        },
    )


@require_POST
@permission_required("cms.change_page")
@transaction.atomic
def move_page(
    request: HttpRequest,
    region_slug: str,
    language_slug: str,
    page_id: int,
    target_id: int,
    position: str,
) -> HttpResponseRedirect:
    """
    Move a page object in the page tree

    :param request: The current request
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :param page_id: The id of the page which should be moved
    :param target_id: The id of the page which determines the new position
    :param position: The new position of the page relative to the target (choices: :mod:`~integreat_cms.cms.constants.position`)
    :return: A redirection to the :class:`~integreat_cms.cms.views.pages.page_tree_view.PageTreeView`
    """

    region = request.region
    page = get_object_or_404(region.pages, id=page_id)
    target = get_object_or_404(region.pages, id=target_id)

    try:
        page.move(target, position)
        # Call the save method on the (reloaded) node in order to trigger possible signal handlers etc.
        # (The move()-method executes raw sql which might cause problems if the instance isn't fetched again)
        page = Page.objects.get(id=page_id)
        try:
            page.save()
        except (
            DBMutexTimeoutError,
            DBMutexError,
        ):
            logger.warning(
                "First IntegrityError while moving %r – waiting 1s and trying again…",
                page,
            )
            time.sleep(1)
            try:
                page.save()
            except (
                DBMutexTimeoutError,
                DBMutexError,
            ):
                logger.warning(
                    "Second IntegrityError while moving %r – repairing tree and trying again…",
                    page,
                )
                repair_tree(page_id, commit=True)
                try:
                    page.save()
                except (
                    DBMutexTimeoutError,
                    DBMutexError,
                ) as e:
                    messages.error(
                        request,
                        _(
                            "Error while saving page. Someone else probably moved another page at the same time."
                        ),
                    )
                    logger.exception(e)
        logger.debug(
            "%r moved to %r of %r by %r",
            page,
            position,
            target,
            request.user,
        )
        messages.success(
            request,
            _('The page "{page}" was successfully moved.').format(
                page=page.best_translation.title
            ),
        )
    except (
        ValueError,
        InvalidPosition,
        InvalidMoveToDescendant,
    ) as e:
        messages.error(request, e)
        logger.exception(e)

    return redirect(
        "pages",
        **{
            "region_slug": region_slug,
            "language_slug": language_slug,
        },
    )


@permission_required("cms.view_page")
# pylint: disable=unused-argument
def get_page_order_table_ajax(
    request: HttpRequest,
    region_slug: str,
    parent_id: int | None = None,
    page_id: int | None = None,
) -> HttpResponse:
    """
    Retrieve the order table for a given page and a given parent page.
    This is used in the page form to change the order of a page relative to its siblings.

    :param request: The current request
    :param region_slug: The slug of the current region
    :param parent_id: The id of the parent page to which the order table should be returned
    :param page_id: The id of the page of the current page form
    :return: The rendered page order table
    """

    region = request.region

    page = get_object_or_404(region.pages, id=page_id) if page_id else None

    if parent_id:
        parent = get_object_or_404(region.pages, id=parent_id)
        siblings = [
            sibling
            for sibling in parent.cached_children
            if not sibling.explicitly_archived
        ]
    else:
        siblings = region.get_root_pages().filter(explicitly_archived=False)

    logger.debug(
        "Page order table for page %r and siblings %r",
        page,
        siblings,
    )

    return render(
        request,
        "pages/_page_order_table.html",
        {
            "page": page,
            "siblings": siblings,
            "can_edit_page": request.user.has_perm("cms.change_page_object", page),
        },
    )


@permission_required("cms.view_page")
# pylint: disable=unused-argument
def render_mirrored_page_field(
    request: HttpRequest, region_slug: str, language_slug: str
) -> HttpResponse:
    """
    Retrieve the rendered mirrored page field template

    :param request: The current request
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :return: The rendered mirrored page field
    """
    # Get the region from which the content should be embedded
    region = get_object_or_404(Region, id=request.GET.get("region_id"))
    # Get the page in which the content should be embedded (to exclude it from the possible selections)
    page = Page.objects.filter(id=request.GET.get("page_id")).first()
    page_form = PageForm(
        data={"mirrored_page_region": region.id},
        instance=page,
        additional_instance_attributes={
            "region": region,
        },
    )
    # Pass language to mirrored page widget to render the preview urls
    page_form.fields["mirrored_page"].widget.language_slug = language_slug
    return render(
        request,
        "pages/_mirrored_page_field.html",
        {
            "page_form": page_form,
        },
    )


@require_POST
# pylint: disable=unused-argument
def refresh_date(
    request: HttpRequest,
    page_id: int,
    region_slug: str,
    language_slug: str,
) -> HttpResponseRedirect:
    """
    Refresh the date for up-to-date translations of a corresponding page

    :param request: The current request
    :param page_id: The id of the page of the current page form
    :param region_slug: The slug of the current region
    :param language_slug: The slug of the current language
    :raises ~django.core.exceptions.PermissionDenied: If the user does not have the permission to refresh page dates

    :return: A redirection to the :class:`~integreat_cms.cms.views.pages.page_form_view.PageFormView`
    """
    region = request.region
    page = get_object_or_404(region.get_pages(archived=False), id=page_id)

    if not request.user.has_perm("cms.change_page_object", page):
        raise PermissionDenied(
            f"{request.user!r} does not have the permission mark {page!r} as up-to-date"
        )

    # Consider only the last version of each translation
    page_translations = page.translations.filter(
        language__in=region.active_languages
    ).distinct("page__pk", "language__pk")
    # Sort page translations according to the position of their languages in the
    # language tree to ensure that the translations are not considered outdated.
    page_translations = sorted(
        page_translations,
        key=lambda page_translation: region.active_languages.index(
            page_translation.language
        ),
    )
    translations_to_update = [
        page_translation
        for page_translation in page_translations
        if page_translation.language.slug == language_slug
        or page_translation.is_up_to_date
    ]
    # Update timestamps of up-to-date translations
    for page_translation in translations_to_update:
        page_translation.save()

    messages.success(request, _("Marked all translations of this page as up-to-date"))
    return redirect(
        "edit_page",
        **{
            "page_id": page_id,
            "language_slug": language_slug,
            "region_slug": request.region.slug,
        },
    )