datahuborg/datahub

View on GitHub
src/integration_tests/test_rlsmanager.py

Summary

Maintainability
F
4 days
Test Coverage
from random import shuffle

from core.db.manager import RowLevelSecurityManager
from core.db.manager import DataHubManager, PermissionDenied

from django.contrib.auth.models import User
from django.test import TestCase
from django.conf import settings


class RLSManagerIntegrationTests(TestCase):
    """Tests RLS query rewriting"""

    owner_username = "delete_me_owner_username"
    owner_password = "delete_me_owner_password"
    owner_email = "{0}@datahub.csail.mit.edu".format(owner_username)

    other_username = "delete_me_other_username"
    other_password = "delete_me_other_password"
    other_email = "{0}@datahub.csail.mit.edu".format(other_username)

    private_repo = 'private_repo'
    shared_repo = 'shared_repo'
    public_repo = 'public_repo'
    repos = [public_repo, shared_repo, private_repo]

    table1 = 'table1'
    table2 = 'table2'
    tables = [table1, table2]

    @classmethod
    def delete_all_test_users(cls):
        # When building tests, it's possible to delete some combination of the
        # django user/postgres user/postgres user database
        # This tries to catch the edge cases.
        all_users = DataHubManager.list_all_users()
        test_users = filter(lambda x: x.startswith('delete_me_'), all_users)
        for user in test_users:
            try:
                DataHubManager.remove_user(user, remove_db=True,
                                           ignore_missing_user=True)
            except:
                print('UNABLE TO DELETE USER ' + user)

        # Delete all django users whose name starts with 'delete_me_'.
        all_users = User.objects.all()
        test_users = all_users.filter(username__startswith='delete_me_')
        for user in test_users:
            user.delete()

    def setUp(self):
        # Start with a clean slate.
        self.delete_all_test_users()
        # create an owner
        self.owner = User.objects.create_user(
            self.owner_username, self.owner_email, self.owner_password)
        # create a potential collaborator
        self.other = User.objects.create_user(
            self.other_username, self.other_email, self.other_password)
        # create some repos, tables, and views
        with DataHubManager(user=self.owner_username,
                            repo_base=self.owner_username) as m:
            for repo in self.repos:
                m.create_repo(repo)

                for table in self.tables:
                    query = ("Create table %s.%s "
                             "(id integer, words text) ") % (repo, table)
                    m.execute_sql(query)

                    for i in range(1, 20):
                        chicken = ['c', 'h', 'i', 'c', 'k', 'e', 'n']
                        shuffle(chicken)
                        chicken = ''.join(chicken)

                        query = ('insert into %s.%s values(%i, \'%s\')'
                                 % (repo, table, i, chicken))
                        m.execute_sql(query)

            # make the appropriate repos shared/public
            m.add_collaborator(repo=self.shared_repo,
                               collaborator=self.other_username,
                               db_privileges=['SELECT'],
                               file_privileges=['read'])
            m.add_collaborator(repo=self.public_repo,
                               collaborator=settings.PUBLIC_ROLE,
                               db_privileges=['SELECT'],
                               file_privileges=['read'])

    def tearDown(self):
        self.delete_all_test_users()

    # Private Repo #
    def test_private_repo_rls_does_not_grant_access(self):
        """
        Allowing someone to select via RLS does not override DB Grants.
        So even if the OWNER of a repo enables a COLLABORATOR to SELECT from a
        table, if they haven't added COLLABORATOR via add_collaborator/the DB
        Their RLS share is overridden
        """
        # Grant access to the table in RLS, but not via a DB grant
        RowLevelSecurityManager.create_security_policy(
            policy="True", policy_type="Select", grantee=self.other_username,
            grantor=self.owner_username, repo_base=self.owner_username,
            repo=self.private_repo, table=self.table1, safe=True)

        # Try to access the repo
        with DataHubManager(user=self.other_username,
                            repo_base=self.owner_username) as m:
            with self.assertRaises(PermissionDenied):
                m.execute_sql('select * from private_repo.table1;')

    # Shared Repo #
    def test_shared_repo_rls_restricted_access(self):
        '''
        share a repo, and then restrict access to one of its tables, using rls
        '''

        # other_username is already granted access to shared_repo in self.setUp

        # restrict other_username's access to just the row where id=2
        RowLevelSecurityManager.create_security_policy(
            policy="id=2", policy_type="select", grantee=self.other_username,
            grantor=self.owner_username, repo_base=self.owner_username,
            repo=self.shared_repo, table=self.table1, safe=True)

        # see what other_username can access
        with DataHubManager(user=self.other_username,
                            repo_base=self.owner_username) as m:
            res = m.execute_sql('select id, words from shared_repo.table1;')

        self.assertEqual(res['tuples'][0][0], 2)

    def test_shared_repo_rls_restricted_access_with_capital_policy(self):
        '''
        same as the above test, but uses a capital 'SELECT' policy type,
        which has caused trouble in the past
        '''
        # other_username is already granted access to shared_repo in self.setUp

        # restrict other_username's access to just the row where id=2
        RowLevelSecurityManager.create_security_policy(
            policy="id=2", policy_type="SELECT", grantee=self.other_username,
            grantor=self.owner_username, repo_base=self.owner_username,
            repo=self.shared_repo, table=self.table1, safe=True)

        # see what other_username can access
        with DataHubManager(user=self.other_username,
                            repo_base=self.owner_username) as m:
            res = m.execute_sql('select id, words from shared_repo.table1;')

        self.assertEqual(res['tuples'][0][0], 2)

    # Public Repo #
    def test_public_repo_restricted_access_w_authenticated_collab(self):
        '''
        a policy applied to the public role should NOT affect a share that's
        made explicitly with a specific user
        '''
        RowLevelSecurityManager.create_security_policy(
            policy="id=2", policy_type="SELECT", grantee=settings.PUBLIC_ROLE,
            grantor=self.owner_username, repo_base=self.owner_username,
            repo=self.shared_repo, table=self.table1, safe=True)

        # see what other_username can access
        with DataHubManager(user=self.other_username,
                            repo_base=self.owner_username) as m:
            res = m.execute_sql('select id, words from shared_repo.table1;')

        # the authenticated user should see more than the one record that the
        # public user is limited to
        self.assertTrue(len(res['tuples']) > 1)

    def test_public_repo_restricted_access_w_unauthenticated_noncollab(self):
        '''
        a policy applied to the public role SHOULD affect anonymous users
        '''
        RowLevelSecurityManager.create_security_policy(
            policy="id=2", policy_type="SELECT", grantee=settings.RLS_PUBLIC,
            grantor=self.owner_username, repo_base=self.owner_username,
            repo=self.public_repo, table=self.table1, safe=True)

        # see the unauthenticated user can access
        with DataHubManager(user=settings.ANONYMOUS_ROLE,
                            repo_base=self.owner_username) as m:
            res = m.execute_sql('select id, words from public_repo.table1;')

        # the authenticated user should see more than the one record that the
        # public user is limited to
        self.assertTrue(len(res['tuples']) == 1)

    def test_public_repo_restricted_access_w_authenticated_noncollab(self):
        '''
        a policy applied to the public role SHOULD affect authenticated users
        with whom the share has not been made explicit
        '''
        RowLevelSecurityManager.create_security_policy(
            policy="id=2", policy_type="SELECT", grantee=settings.RLS_PUBLIC,
            grantor=self.owner_username, repo_base=self.owner_username,
            repo=self.public_repo, table=self.table1, safe=True)

        # see what other_username can access
        with DataHubManager(user=self.other_username,
                            repo_base=self.owner_username) as m:
            res = m.execute_sql('select id, words from public_repo.table1;')

        # the authenticated user should see more than the one record that the
        # public user is limited to
        self.assertTrue(len(res['tuples']) == 1)