svthalia/concrexit

View on GitHub
website/photos/services.py

Summary

Maintainability
A
3 hrs
Test Coverage
import logging
import os
import tarfile
from zipfile import ZipFile, is_zipfile

from django.core.files import File
from django.db import transaction
from django.db.models import BooleanField, Case, ExpressionWrapper, Q, Value, When
from django.forms import ValidationError
from django.http import Http404
from django.utils.translation import gettext_lazy as _

from PIL import UnidentifiedImageError

from photos.models import Photo

logger = logging.getLogger(__name__)


def check_shared_album_token(album, token):
    """Return a 404 if the token does not match the album token."""
    if token != album.access_token:
        raise Http404("Invalid token.")


def is_album_accessible(request, album):
    """Check if the request user can access an album."""
    if request.member and request.member.current_membership is not None:
        return True
    if request.member and request.member.current_membership is None:
        # This user is currently not a member, so need to check if he/she
        # can view this album by checking the membership
        return request.member.membership_set.filter(
            Q(since__lte=album.date) & Q(until__gte=album.date)
        ).exists()
    return False


def get_annotated_accessible_albums(request, albums):
    """Annotate the albums which are accessible by the user."""
    if request.member and request.member.current_membership is not None:
        albums = albums.annotate(
            accessible=ExpressionWrapper(Value(True), output_field=BooleanField())
        )
    elif request.member and request.member.current_membership is None:
        albums_filter = Q(pk__in=[])
        for membership in request.member.membership_set.all():
            albums_filter |= Q(date__gte=membership.since) & Q(
                date__lte=membership.until
            )

        albums = albums.annotate(
            accessible=Case(
                When(albums_filter, then=Value(True)),
                default=Value(False),
                output_field=BooleanField(),
            )
        )
    else:
        albums = albums.annotate(
            accessible=ExpressionWrapper(Value(False), output_field=BooleanField())
        )

    return albums


def extract_archive(album, archive) -> tuple[dict[str, str], int]:
    """Extract zip and tar files."""
    warnings, count = {}, 0
    if is_zipfile(archive):
        archive.seek(0)
        with ZipFile(archive) as zip_file:
            for photo in sorted(zip_file.namelist()):
                if not _has_photo_extension(photo):
                    warnings[photo] = "has an unknown extension."
                    continue

                with zip_file.open(photo) as file:
                    if warning := _try_save_photo(album, file, photo):
                        warnings[photo] = warning
                    else:
                        count += 1
        return warnings, count

    archive.seek(0)
    # is_tarfile only supports filenames, so we cannot use that
    try:
        with tarfile.open(fileobj=archive) as tar_file:
            for photo in sorted(tar_file.getnames()):
                if not _has_photo_extension(photo):
                    warnings[photo] = "has an unknown extension."
                    continue

                with tar_file.extractfile(photo) as file:
                    if warning := _try_save_photo(album, file, photo):
                        warnings[photo] = warning
                    else:
                        count += 1
    except tarfile.ReadError as e:
        raise ValueError(_("The uploaded file is not a zip or tar file.")) from e

    return warnings, count


def _has_photo_extension(filename):
    """Check if the filename has a photo extension."""
    __, extension = os.path.splitext(filename)
    return extension.lower() in (".jpg", ".jpeg", ".png", ".webp")


def _try_save_photo(album, file, filename) -> str | None:
    """Try to save a photo to an album.

    Returns None, or a string describing a reason for failure.
    """
    instance = Photo(album=album)
    instance.file = File(file, filename)
    try:
        with transaction.atomic():
            instance.full_clean()
            instance.save()
    except ValidationError as e:
        logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e)
        return "could not be read."
    except UnidentifiedImageError as e:
        logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e)
        return "could not be read."
    except OSError as e:
        logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e)
        return "could not be read."