divio/django-cms

View on GitHub
cms/test_utils/testcases.py

Summary

Maintainability
D
2 days
Test Coverage
import json
import sys
import warnings
from urllib.parse import unquote, urljoin

from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser, Permission
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.forms.models import model_to_dict
from django.template import engines
from django.template.context import Context
from django.test import testcases
from django.test.client import RequestFactory
from django.urls import reverse
from django.utils import translation
from django.utils.http import urlencode
from django.utils.timezone import now

from cms.api import create_page
from cms.constants import PUBLISHER_STATE_DEFAULT, PUBLISHER_STATE_DIRTY, PUBLISHER_STATE_PENDING
from cms.models import Page
from cms.models.permissionmodels import GlobalPagePermission, PagePermission, PageUser
from cms.plugin_rendering import ContentRenderer, StructureRenderer
from cms.test_utils.util.context_managers import UserLoginContext
from cms.utils.conf import get_cms_setting
from cms.utils.permissions import set_current_user
from cms.utils.urlutils import admin_reverse
from menus.menu_pool import menu_pool

URL_CMS_PAGE = "/en/admin/cms/page/"
URL_CMS_PAGE_ADD = urljoin(URL_CMS_PAGE, "add/")
URL_CMS_PAGE_CHANGE_BASE = urljoin(URL_CMS_PAGE, "%d/")
URL_CMS_PAGE_CHANGE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "change/")
URL_CMS_PAGE_ADVANCED_CHANGE = urljoin(URL_CMS_PAGE, "%d/advanced-settings/")
URL_CMS_PAGE_PERMISSION_CHANGE = urljoin(URL_CMS_PAGE, "%d/permission-settings/")
URL_CMS_PAGE_PERMISSIONS = urljoin(URL_CMS_PAGE, "%d/permissions/")
URL_CMS_PAGE_PUBLISHED = urljoin(URL_CMS_PAGE, "published-pages/")
URL_CMS_PAGE_MOVE = urljoin(URL_CMS_PAGE, "%d/move-page/")
URL_CMS_PAGE_COPY = urljoin(URL_CMS_PAGE, "%d/copy-page/")
URL_CMS_PAGE_CHANGE_LANGUAGE = URL_CMS_PAGE_CHANGE + "?language=%s"
URL_CMS_PAGE_CHANGE_TEMPLATE = urljoin(URL_CMS_PAGE_CHANGE, "change-template/")
URL_CMS_PAGE_PUBLISH = urljoin(URL_CMS_PAGE_CHANGE_BASE, "%s/publish/")
URL_CMS_PAGE_DELETE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "delete/")
URL_CMS_PLUGIN_ADD = urljoin(URL_CMS_PAGE, "add-plugin/")
URL_CMS_PLUGIN_EDIT = urljoin(URL_CMS_PAGE, "edit-plugin/")
URL_CMS_PLUGIN_MOVE = urljoin(URL_CMS_PAGE, "move-plugin/")
URL_CMS_PLUGIN_PAGE_MOVE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "move-plugin/")
URL_CMS_PLUGIN_PAGE_ADD = urljoin(URL_CMS_PAGE_CHANGE_BASE, "add-plugin/")
URL_CMS_PLUGIN_REMOVE = urljoin(URL_CMS_PAGE, "delete-plugin/")
URL_CMS_PLUGIN_DELETE = urljoin(URL_CMS_PAGE, "delete-plugin/%s/")
URL_CMS_PLUGINS_COPY = urljoin(URL_CMS_PAGE, "copy-plugins/")
URL_CMS_TRANSLATION_DELETE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "delete-translation/")
URL_CMS_USERSETTINGS = "/en/admin/cms/usersettings/"


class _Warning:
    def __init__(self, message, category, filename, lineno):
        self.message = message
        self.category = category
        self.filename = filename
        self.lineno = lineno


def _collectWarnings(observeWarning, f, *args, **kwargs):
    def showWarning(message, category, filename, lineno, file=None, line=None):
        assert isinstance(message, Warning)
        observeWarning(_Warning(
            message.args[0], category, filename, lineno))

    # Disable the per-module cache for every module otherwise if the warning
    # which the caller is expecting us to collect was already emitted it won't
    # be re-emitted by the call to f which happens below.
    for v in sys.modules.values():
        if v is not None:
            try:
                v.__warningregistry__ = None
            except:  # noqa: E722
                # Don't specify a particular exception type to handle in case
                # some wacky object raises some wacky exception in response to
                # the setattr attempt.
                pass

    origFilters = warnings.filters[:]
    origShow = warnings.showwarning
    warnings.simplefilter('always')
    try:
        warnings.showwarning = showWarning
        result = f(*args, **kwargs)
    finally:
        warnings.filters[:] = origFilters
        warnings.showwarning = origShow
    return result


class BaseCMSTestCase:
    counter = 1

    def _fixture_setup(self):
        super()._fixture_setup()
        self.create_fixtures()
        translation.activate("en")

    def create_fixtures(self):
        pass

    def _post_teardown(self):
        menu_pool.clear()
        cache.clear()
        super()._post_teardown()
        set_current_user(None)

    def login_user_context(self, user):
        return UserLoginContext(self, user)

    def get_permission(self, codename):
        return Permission.objects.get(codename=codename)

    def add_permission(self, user, codename):
        user.user_permissions.add(self.get_permission(codename))

    def remove_permission(self, user, codename):
        user.user_permissions.remove(Permission.objects.get(codename=codename))

    def add_global_permission(self, user, **kwargs):
        options = {
            'can_add': False,
            'can_change': False,
            'can_delete': False,
            'can_change_advanced_settings': False,
            'can_publish': False,
            'can_change_permissions': False,
            'can_move_page': False,
            'can_recover_page': False,
            'user': user,
        }
        options.update(**kwargs)

        gpp = GlobalPagePermission.objects.create(**options)
        gpp.sites.set(Site.objects.all())
        return gpp

    def add_page_permission(self, user, page, **kwargs):
        options = {
            'can_add': False,
            'can_change': False,
            'can_delete': False,
            'can_change_advanced_settings': False,
            'can_publish': False,
            'can_change_permissions': False,
            'can_move_page': False,
            'page': page,
            'user': user,
        }
        options.update(**kwargs)

        pp = PagePermission.objects.create(**options)
        pp.sites = Site.objects.all()
        return pp

    def _create_user(self, username, is_staff=False, is_superuser=False,
                     is_active=True, add_default_permissions=False, permissions=None):
        """
        Use this method to create users.

        Default permissions on page and text plugin are added if creating a
        non-superuser and `add_default_permissions` is set.

        Set `permissions` parameter to an iterable of permission codes to add
        custom permissios.
        """
        User = get_user_model()

        fields = dict(email=username + '@django-cms.org', last_login=now(),
                      is_staff=is_staff, is_active=is_active, is_superuser=is_superuser
        )

        # Check for special case where email is used as username
        if (get_user_model().USERNAME_FIELD != 'email'):
            fields[get_user_model().USERNAME_FIELD] = username

        user = User(**fields)

        user.set_password(getattr(user, get_user_model().USERNAME_FIELD))
        user.save()
        if is_staff and not is_superuser and add_default_permissions:
            user.user_permissions.add(Permission.objects.get(codename='add_text'))
            user.user_permissions.add(Permission.objects.get(codename='delete_text'))
            user.user_permissions.add(Permission.objects.get(codename='change_text'))
            user.user_permissions.add(Permission.objects.get(codename='publish_page'))

            user.user_permissions.add(Permission.objects.get(codename='add_page'))
            user.user_permissions.add(Permission.objects.get(codename='change_page'))
            user.user_permissions.add(Permission.objects.get(codename='delete_page'))
        if is_staff and not is_superuser and permissions:
            for permission in permissions:
                user.user_permissions.add(Permission.objects.get(codename=permission))
        return user

    def get_superuser(self):
        try:
            query = dict()

            if get_user_model().USERNAME_FIELD != "email":
                query[get_user_model().USERNAME_FIELD] = "admin"
            else:
                query[get_user_model().USERNAME_FIELD] = "admin@django-cms.org"

            admin = get_user_model().objects.get(**query)
        except get_user_model().DoesNotExist:
            admin = self._create_user("admin", is_staff=True, is_superuser=True)
        return admin

    def get_staff_user_with_no_permissions(self):
        """
        Used in security tests
        """
        staff = self._create_user("staff", is_staff=True, is_superuser=False)
        return staff

    def get_staff_user_with_std_permissions(self):
        """
        This is a non superuser staff
        """
        staff = self._create_user("staff", is_staff=True, is_superuser=False,
                                  add_default_permissions=True)
        return staff

    def get_standard_user(self):
        """
        Used in security tests
        """
        standard = self._create_user("standard", is_staff=False, is_superuser=False)
        return standard

    def get_staff_page_user(self, created_by=None):
        if not created_by:
            created_by = self.get_superuser()

        parent_link_field = list(PageUser._meta.parents.values())[0]
        user = self._create_user(
            'perms-testuser',
            is_staff=True,
            is_superuser=False,
        )
        data = model_to_dict(user, exclude=['groups', 'user_permissions'])
        data[parent_link_field.name] = user
        data['created_by'] = created_by
        return PageUser.objects.create(**data)

    def get_new_page_data(self, parent_id=''):
        page_data = {
            'title': 'test page %d' % self.counter,
            'slug': 'test-page-%d' % self.counter,
            'parent_node': parent_id,
        }
        # required only if user haves can_change_permission
        self.counter += 1
        return page_data

    def get_new_page_data_dbfields(self, parent=None, site=None,
                                   language=None,
                                   template='nav_playground.html', ):
        page_data = {
            'title': 'test page %d' % self.counter,
            'slug': 'test-page-%d' % self.counter,
            'language': settings.LANGUAGES[0][0] if not language else language,
            'template': template,
            'parent': parent if parent else None,
            'site': site if site else Site.objects.get_current(),
        }
        self.counter = self.counter + 1
        return page_data

    def get_pagedata_from_dbfields(self, page_data):
        """Converts data created by get_new_page_data_dbfields to data
        created from get_new_page_data so you can switch between test cases
        in api.create_page and client.post"""
        page_data['site'] = page_data['site'].id
        page_data['parent'] = page_data['parent'].id if page_data['parent'] else ''
        # required only if user haves can_change_permission
        page_data['pagepermission_set-TOTAL_FORMS'] = 0
        page_data['pagepermission_set-INITIAL_FORMS'] = 0
        page_data['pagepermission_set-MAX_NUM_FORMS'] = 0
        page_data['pagepermission_set-2-TOTAL_FORMS'] = 0
        page_data['pagepermission_set-2-INITIAL_FORMS'] = 0
        page_data['pagepermission_set-2-MAX_NUM_FORMS'] = 0
        return page_data

    def print_page_structure(self, qs):
        """Just a helper to see the page struct.
        """
        for page in qs.order_by('path'):
            ident = "  " * page.level
            print("{}{} ({}), path: {}, depth: {}, numchild: {}".format(ident, page,
            page.pk, page.path, page.depth, page.numchild))

    def print_node_structure(self, nodes, *extra):
        def _rec(nodes, level=0):
            ident = level * '  '
            for node in nodes:
                raw_attrs = [(bit, getattr(node, bit, node.attr.get(bit, "unknown"))) for bit in extra]
                attrs = ', '.join(['%s: %r' % data for data in raw_attrs])
                print(f"{ident}{node.title}: {attrs}")
                _rec(node.children, level + 1)

        _rec(nodes)

    def assertObjectExist(self, qs, **filter):
        try:
            return qs.get(**filter)
        except ObjectDoesNotExist:
            pass
        raise self.failureException("ObjectDoesNotExist raised for filter %s" % filter)

    def assertObjectDoesNotExist(self, qs, **filter):
        try:
            qs.get(**filter)
        except ObjectDoesNotExist:
            return
        raise self.failureException("ObjectDoesNotExist not raised for filter %s" % filter)

    def copy_page(self, page, target_page, position=0, target_site=None):
        from cms.utils.page import get_available_slug

        if target_site is None:
            target_site = target_page.node.site

        data = {
            'position': position,
            'target': target_page.pk,
            'source_site': page.node.site_id,
            'copy_permissions': 'on',
            'copy_moderation': 'on',
        }
        source_translation = page.title_set.all()[0]
        parent_translation = target_page.title_set.all()[0]
        language = source_translation.language
        copied_page_path = source_translation.get_path_for_base(parent_translation.path)
        new_page_slug = get_available_slug(target_site, copied_page_path, language)

        with self.settings(SITE_ID=target_site.pk):
            response = self.client.post(URL_CMS_PAGE + "%d/copy-page/" % page.pk, data)
        self.assertEqual(response.status_code, 200)
        response_data = json.loads(response.content.decode('utf8'))
        copied_page = self.assertObjectExist(
            Page.objects.all(),
            pk=response_data['id'],
        )
        self.assertObjectExist(copied_page.title_set.filter(language=language), slug=new_page_slug)
        page._clear_node_cache()
        target_page._clear_node_cache()
        return copied_page

    def create_homepage(self, *args, **kwargs):
        homepage = create_page(*args, **kwargs)
        homepage.set_as_homepage()
        return homepage.reload()

    def move_page(self, page, target_page, position="first-child"):
        page.move_page(target_page.node, position)
        return self.reload_page(page)

    def reload_page(self, page):
        """
        Returns a fresh instance of the page from the database
        """
        return self.reload(page)

    def reload(self, obj):
        return obj.__class__.objects.get(pk=obj.pk)

    def get_pages_root(self):
        return unquote(reverse("pages-root"))

    def get_context(self, path=None, page=None):
        if not path:
            path = self.get_pages_root()
        context = {}
        request = self.get_request(path, page=page)
        context['request'] = request
        return Context(context)

    def get_content_renderer(self, request=None):
        request = request or self.get_request()
        return ContentRenderer(request)

    def get_structure_renderer(self, request=None):
        request = request or self.get_request()
        return StructureRenderer(request)

    def get_request(self, path=None, language=None, post_data=None, enforce_csrf_checks=False, page=None, domain=None):
        factory = RequestFactory()

        if not path:
            path = self.get_pages_root()

        if not language:
            language = translation.get_language()

        if post_data:
            request = factory.post(path, post_data)
        else:
            request = factory.get(path)
        request.session = self.client.session
        request.user = getattr(self, 'user', AnonymousUser())
        request.LANGUAGE_CODE = language
        if domain:
            request.META["SERVER_NAME"] = domain
            request.SERVER_NAME = domain
        request._dont_enforce_csrf_checks = not enforce_csrf_checks
        if page:
            request.current_page = page
        else:
            request.current_page = None

        class MockStorage:

            def __len__(self):
                return 0

            def __iter__(self):
                return iter([])

            def add(self, level, message, extra_tags=''):
                pass

            def update(self, response):
                pass

        request._messages = MockStorage()
        return request

    def failUnlessWarns(self, category, message, f, *args, **kwargs):
        warningsShown = []
        cleanwarningsShown = []
        result = _collectWarnings(warningsShown.append, f, *args, **kwargs)

        if not warningsShown:
            self.fail("No warnings emitted")

        for warning in warningsShown:
            # this specific warning is present due to the way Django
            # handle asyncio
            # https://stackoverflow.com/questions/70303895/python-3-10-asyncio-gather-shows-deprecationwarning-there-is-no-current-event
            if (warning.category != DeprecationWarning and warning.message != 'There is no current event loop'):
                cleanwarningsShown.append(warning)

        first = cleanwarningsShown[0]
        for other in cleanwarningsShown[1:]:
            if ((other.message, other.category) != (first.message, first.category)):
                self.fail("Can't handle different warnings")

        self.assertEqual(first.message, message)
        self.assertTrue(first.category is category)

        return result

    assertWarns = failUnlessWarns

    def load_template_from_string(self, template):
        return engines['django'].from_string(template)

    def get_template(self, template):
        return engines['django'].get_template(template)

    def render_template_obj(self, template, context, request):
        template_obj = self.load_template_from_string(template)
        return template_obj.render(context, request)

    def apphook_clear(self):
        from cms.apphook_pool import apphook_pool
        for name, label in list(apphook_pool.get_apphooks()):
            if apphook_pool.apps[name].__class__.__module__ in sys.modules:
                del sys.modules[apphook_pool.apps[name].__class__.__module__]
        apphook_pool.clear()

    def get_admin_url(self, model, action, *args):
        opts = model._meta
        url_name = f"{opts.app_label}_{opts.model_name}_{action}"
        return admin_reverse(url_name, args=args)

    def get_permissions_test_page(self):
        admin = self.get_superuser()
        create_page(
            "home",
            "nav_playground.html",
            "en",
            created_by=admin,
            published=True,
        )
        page = create_page(
            "permissions",
            "nav_playground.html",
            "en",
            created_by=admin,
            published=True,
            reverse_id='permissions',
        )
        return page

    def get_plugin_model(self, plugin_type):
        from cms.plugin_pool import plugin_pool

        return plugin_pool.get_plugin(plugin_type).model

    def get_add_plugin_uri(self, placeholder, plugin_type, language='en', parent=None):
        if placeholder.page:
            path = placeholder.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = placeholder.get_add_url()
        data = {
            'plugin_type': plugin_type,
            'placeholder_id': placeholder.pk,
            'plugin_language': language,
            'cms_path': path,
        }

        if parent:
            data['plugin_parent'] = parent.pk
        return endpoint + '?' + urlencode(data)

    def get_change_plugin_uri(self, plugin, container=None, language=None):
        container = container or Page
        language = language or 'en'

        if plugin.page:
            path = plugin.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = self.get_admin_url(container, 'edit_plugin', plugin.pk)
        endpoint += '?' + urlencode({'cms_path': path})
        return endpoint

    def get_move_plugin_uri(self, plugin, container=None, language=None):
        container = container or Page
        language = language or 'en'

        if plugin.page:
            path = plugin.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = self.get_admin_url(container, 'move_plugin')
        endpoint += '?' + urlencode({'cms_path': path})
        return endpoint

    def get_copy_plugin_uri(self, plugin, container=None, language=None):
        container = container or Page
        language = language or 'en'

        if plugin.page:
            path = plugin.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = self.get_admin_url(container, 'copy_plugins')
        endpoint += '?' + urlencode({'cms_path': path})
        return endpoint

    def get_copy_placeholder_uri(self, placeholder, container=None, language=None):
        container = container or Page
        language = language or 'en'

        if placeholder.page:
            path = placeholder.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = self.get_admin_url(container, 'copy_plugins')
        endpoint += '?' + urlencode({'cms_path': path})
        return endpoint

    def get_delete_plugin_uri(self, plugin, container=None, language=None):
        container = container or Page
        language = language or 'en'

        if plugin.page:
            path = plugin.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = self.get_admin_url(container, 'delete_plugin', plugin.pk)
        endpoint += '?' + urlencode({'cms_path': path})
        return endpoint

    def get_clear_placeholder_url(self, placeholder, container=None, language=None):
        container = container or Page
        language = language or 'en'

        if placeholder.page:
            path = placeholder.page.get_absolute_url(language)
        else:
            path = f'/{language}/'

        endpoint = self.get_admin_url(
            container,
            'clear_placeholder',
            placeholder.pk,
        )
        endpoint += '?' + urlencode({
            'language': language,
            'cms_path': path,
        })
        return endpoint

    def get_edit_on_url(self, url):
        return '{}?{}'.format(url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON'))

    def get_edit_off_url(self, url):
        return '{}?{}'.format(url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_OFF'))

    def get_obj_structure_url(self, url):
        return '{}?{}'.format(url, get_cms_setting('TOOLBAR_URL__BUILD'))

    def get_toolbar_disable_url(self, url):
        return '{}?{}'.format(url, get_cms_setting('TOOLBAR_URL__DISABLE'))


class CMSTestCase(BaseCMSTestCase, testcases.TestCase):

    def assertPending(self, page):
        if page.publisher_is_draft:
            # draft
            self.assertFalse(page.is_published('en'))
            self.assertTrue(bool(page.publisher_public_id))
            self.assertTrue(page.get_title_obj('en').published)
            self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_PENDING)
            self.assertPending(page.publisher_public)
        else:
            # public
            self.assertFalse(page.is_published('en'))
            self.assertTrue(bool(page.publisher_public_id))
            self.assertFalse(page.get_title_obj('en').published)

    def assertPublished(self, page):
        if page.publisher_is_draft:
            # draft
            self.assertTrue(page.is_published('en'))
            self.assertTrue(page.get_title_obj('en').published)
            self.assertTrue(bool(page.publisher_public_id))
            self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_DEFAULT)
            self.assertPublished(page.publisher_public)
        else:
            # public
            self.assertTrue(page.is_published('en'))
            self.assertTrue(page.get_title_obj('en').published)
            self.assertTrue(bool(page.publisher_public_id))

    def assertUnpublished(self, page):
        if page.publisher_is_draft:
            # draft
            self.assertFalse(page.is_published('en'))
            self.assertTrue(bool(page.publisher_public_id))
            self.assertFalse(page.get_title_obj('en').published)
            self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_DIRTY)
            self.assertUnpublished(page.publisher_public)
        else:
            # public
            self.assertFalse(page.is_published('en'))
            self.assertTrue(bool(page.publisher_public_id))
            self.assertFalse(page.get_title_obj('en').published)

    def assertNeverPublished(self, page):
        self.assertTrue(page.publisher_is_draft)
        self.assertFalse(page.is_published('en'))
        self.assertIsNone(page.publisher_public)
        self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_DIRTY)


class TransactionCMSTestCase(CMSTestCase, testcases.TransactionTestCase):
    pass