biblib/views/permission_view.py
"""
Perimssion view
"""
from flask import request, current_app
from flask_discoverer import advertise
from biblib.models import User, Library, Permissions
from biblib.client import client
from biblib.views.base_view import BaseView
from sqlalchemy.orm.exc import NoResultFound
from biblib.utils import get_post_data, err
from biblib.views.http_errors import MISSING_USERNAME_ERROR, NO_PERMISSION_ERROR, \
WRONG_TYPE_ERROR, API_MISSING_USER_EMAIL, BAD_LIBRARY_ID_ERROR
from biblib.biblib_exceptions import PermissionDeniedError
from biblib.emails import PermissionsChangedEmail
from jinja2 import Environment, PackageLoader, select_autoescape
env = Environment(
loader=PackageLoader('biblib', 'templates'),
autoescape=select_autoescape(enabled_extensions=('html', 'xml'),
default_for_string=True)
)
class PermissionView(BaseView):
"""
End point to manipulate the permissions between a user and a library
"""
# TODO: Users that do not have an account, cannot be added to permissions
# TODO: - send invitation?
decorators = [advertise('scopes', 'rate_limit')]
scopes = ['user']
rate_limit = [1000, 60*60*24]
# Some permissions for this View
read_permission = ['admin', 'owner']
@staticmethod
def has_permission(service_uid_editor,
service_uid_modify,
library_id):
"""
Check if the user wanting to change the library has the correct
permissions to do so, and the user to be changed is not the owner.
:param service_uid_editor: the user ID of the editor
:param service_uid_modify: the user ID of the user to be edited
:param library_id: the library id
:return: boolean
"""
if service_uid_editor == service_uid_modify:
current_app.logger.error('Editing user: {0} and user to edit: {1}'
' are the same. This is not allowed.'
.format(service_uid_modify,
service_uid_editor))
return False
current_app.logger.info('Checking if user: {0}, can edit the '
'permissions of user: {1}'
.format(
service_uid_editor,
service_uid_modify
))
# Check if the editor has permissions
with current_app.session_scope() as session:
try:
editor_permissions = session.query(Permissions).filter(
Permissions.user_id == service_uid_editor,
Permissions.library_id == library_id
).one()
except NoResultFound as error:
current_app.logger.error(
'User: {0} has no permissions for this library: {1}'
.format(service_uid_editor, error)
)
return False
if editor_permissions.permissions['owner']:
current_app.logger.info('User: {0} is owner, so is allowed to '
'change permissions'
.format(service_uid_editor))
return True
# Check if the user to be modified has permissions
try:
modify_permissions = session.query(Permissions).filter(
Permissions.user_id == service_uid_modify,
Permissions.library_id == library_id
).one()
except NoResultFound:
modify_permissions = False
# if the editor is admin, and the modifier has no permissions
if editor_permissions.permissions['admin']:
# user to be modified has no permissions
if not modify_permissions:
return True
# user to be modified is not owner
if not modify_permissions.permissions['owner']:
return True
# otherwise the user to be modified is the owner, so not allowed
return False
else:
return False
@staticmethod
def add_permission(service_uid, library_id, permission):
"""
Adds a permission for a user to a specific library
:param service_uid: the user ID within this microservice
:param library_id: the library id to update
:param permission: dict of the permissions to be added/modified
:return: no return
"""
to_set = [k for k, v in permission.items() if (type(v) == bool)]
if not set(to_set).issubset(set(['read', 'write', 'admin', 'owner'])):
raise PermissionDeniedError('Permission Error')
with current_app.session_scope() as session:
try:
# If the user has permission for this already
new_permission = session.query(Permissions).filter_by(
user_id = service_uid,
library_id = library_id
).one()
# can't change owner permission this way - must go through TransferView
if permission.get('owner') and \
(getattr(new_permission, 'permissions').get('owner',False) != permission['owner']):
raise PermissionDeniedError('Permission Error')
current_app.logger.info(
'User: {0} has permission already for '
'library: {1}. Modifying: "{2}" from [{3}] '
'to [{4}]'
.format(service_uid,
library_id,
list(permission.keys()),
getattr(new_permission, 'permissions'),
list(permission.values()))
)
for p, value in permission.items():
getattr(new_permission, 'permissions')[p] = value
# Check if all permission are False, then remove completely
if not (new_permission.permissions['read'] |
new_permission.permissions['write'] |
new_permission.permissions['admin'] |
new_permission.permissions['owner']):
current_app.logger.info('Deleting permission for {0} and '
'library {1} as all permission are '
'False. {2}'
.format(service_uid,
library_id,
new_permission))
session.delete(new_permission)
else:
session.add(new_permission)
except NoResultFound:
# If no permission set yet for user and library
current_app.logger.info('No permission yet set for user: {0} for '
'library: {1}. Using defaults for setup'
' and allocating "{2}"'
.format(service_uid,
library_id,
permission))
user = session.query(User).filter_by(id = service_uid).one()
library = session.query(Library).filter_by(id = library_id).one()
new_permission = Permissions(permissions = {'read': False,
'write': False,
'admin': False,
'owner': False})
# can't set owner permission this way
if permission.get('owner',False) is not False:
raise PermissionDeniedError('Permission Error')
for p, value in permission.items():
getattr(new_permission, 'permissions')[p] = value
# Check if all permission are False, then remove completely
if not (new_permission.permissions['read'] |
new_permission.permissions['write'] |
new_permission.permissions['admin'] |
new_permission.permissions['owner']):
current_app.logger.info('Not adding permissions for {0} and '
'library {1} as all permission are '
'False. {2}'
.format(service_uid,
library_id,
new_permission))
else:
user.permissions.append(new_permission)
library.permissions.append(new_permission)
session.add_all([user, library, new_permission])
session.commit()
@staticmethod
def api_uid_email_lookup(user_info):
"""
Queries the API service that converts uid to email or email to uid,
dependent upon the type passed by the user
:param user_info: <int> is userID, and <unicode>/<str> is email
:return: the API userID or API e-mail
"""
if isinstance(user_info, int):
service = '{api}/{uid}'.format(
api=current_app.config['BIBLIB_USER_EMAIL_ADSWS_API_URL'],
uid=user_info
)
current_app.logger.info('Obtaining e-mail of user: {0} [API UID]'
.format(user_info))
response = client().get(
service
)
return response.json()['email']
else:
return None
@classmethod
def get_permissions(cls, library_id):
"""
Looks up and returns the permissions for all of the users that have
any permissions.
:param library_id: the library ID to look up permissions
:return: list of dictionaries containing the user email and permission
"""
with current_app.session_scope() as session:
# Find the permissions for the library
result = session.query(Permissions, User)\
.join(Permissions.user)\
.filter(Permissions.library_id == library_id)\
.all()
# Formulate the return content
permission_list = []
for permission, user in result:
# Convert the user id into
user = cls.api_uid_email_lookup(user_info=user.absolute_uid)
all_permissions = [key for key in ['read', 'write', 'admin', 'owner'] if permission.permissions[key]]
permission_list.append(
{user: all_permissions}
)
return permission_list
@classmethod
def read_access(cls, service_uid, library_id):
"""
Checks if the user has access to read the permissions on a given
library
:param service_uid: the user ID within this microservice
:param library_id: the library ID to look up permissions
:return: has access (True), does not have access (False)
"""
for access_type in cls.read_permission:
if BaseView.helper_access_allowed(service_uid=service_uid,
library_id=library_id,
access_type=access_type):
return True
return False
@staticmethod
def format_permission_payload(library_name, library_id, permission_data):
"""
Format the permission info into plain text and HTML payloads for the email
:param library_name: string; name of library
:param library_id: string; URL-safe version of library ID, used to create link to library
:param permission_data: dict; keys:
permission_data: dict w/ permissions (keys) and Boolean values
email: email address of user receiving email
:return: payload_plain, payload_html
"""
email = permission_data.get('email', None)
permissions = permission_data.get('permission', None)
if not email or not permissions:
current_app.logger.error('Must pass email and permissions in permission data. '
'Library ID: {0}, permission data: {1}'.format(library_id, permission_data))
raise RuntimeError('Insufficient permission data passed')
readable_permissions = {'read': 'read only',
'write': 'read and write only',
'admin': 'admin (includes read and write)',
'owner': 'owner'}
payload_plain_info = []
payload_html_info = {}
for p, value in permissions.items():
readable_permission = readable_permissions.get(p, None)
if readable_permission:
tmp = u'Library: {0} (ID: {1}) \n Permission: {2} \n Have permission? {3} \n'.\
format(library_name, library_id, readable_permission, value)
payload_html_info[readable_permission] = value
else:
current_app.logger.error('Permission {0} not allowed; part of payload {1}. Exiting.'.format(p, permission_data))
raise ValueError('Wrong permission type passed')
payload_plain_info.append(tmp)
payload_plain = '''
Hi,
Another user has recently updated your library permissions for the following libraries:
{payload}
If this is a mistake, please contact ADS Help (adshelp@cfa.harvard.edu).
- the ADS team
'''.format(payload='\n '.join(payload_plain_info))
template = env.get_template('permission_email.html')
payload_html = template.render(email_address=email,
payload=payload_html_info,
lib_name=library_name,
lib_id=library_id)
return payload_plain, payload_html
# Methods
def get(self, library):
"""
HTTP GET request that returns the permissions for a given library
:param library: library ID
:return: list of permissions
Header:
-------
Must contain the API forwarded user ID of the user accessing the end
point
Return data:
------------
JSON with a list containing dictionary elements
[{<user-email>: [<permission1>, ...., <permissionN>]},
...., {<user-email>: [<permission>1, ...., <permissionN>]}]
Permissions:
-----------
The following type of user can access permission:
- owner
- admin
"""
# Get the user requesting this from the header
try:
user_api = self.helper_get_user_id()
except KeyError:
return err(MISSING_USERNAME_ERROR)
# URL safe base64 string to UUID
try:
library = self.helper_slug_to_uuid(library)
except TypeError:
return err(BAD_LIBRARY_ID_ERROR)
# Get the service ID from the API resolver
service_uid = \
self.helper_absolute_uid_to_service_uid(absolute_uid=user_api)
# Check permissions
if not self.read_access(service_uid=service_uid,
library_id=library):
current_app.logger.error(
'User {0} has the wrong permissions to get the '
'permission list for library {1}'
.format(service_uid, library)
)
return err(NO_PERMISSION_ERROR)
# Get permissions
permissions = self.get_permissions(library_id=library)
# Return data
return permissions, 200
def post(self, library):
"""
HTTP POST request that modifies the permissions of a library
:param library: library ID
:return: the response for if the library was successfully created
Header:
-------
Must contain the API forwarded user ID of the user accessing the end
point
Post data:
----------
KEYWORD, VALUE
email: <e-mail@address>, specifies which user's permissions to be
modified
# TODO fix this
permissions: read, write, specifies which permission to change
admin
value: boolean, whether the user has this permission
Return data:
-----------
No data
Permissions:
-----------
The following type of user can update a permission:
- owner
- admin
Notes:
Currently, the posts are per user, per permission. If its wanted that
lists can be passed, then open an issue. In my mind, it made more
sense that you can retrieve the correct errors in a request/response
cycle, rather than complicating the response with a mixture of success
and failures.
For example, if an admin tries to modify the access for a random person
without permissions, and the owner, the admin is not allowed to modify
the owner. This would be both a success 200, and a forbidden, 404, so
I do not think that makes sense. However, if there are strong arguments
for a list input and the backend handling it, then open an issue on the
repository.
"""
# Get the user requesting this from the header
try:
user_editing = self.helper_get_user_id()
except KeyError:
return err(MISSING_USERNAME_ERROR)
# URL safe base64 string to UUID
try:
library_uuid = self.helper_slug_to_uuid(library)
except TypeError:
return err(BAD_LIBRARY_ID_ERROR)
user_editing_uid = \
self.helper_absolute_uid_to_service_uid(absolute_uid=user_editing)
try:
permission_data = get_post_data(
request,
types=dict(
email=str,
permission=dict
)
)
except TypeError as error:
current_app.logger.error('Wrong type passed for POST: {0} [{1}]'
.format(request.data, error))
return err(WRONG_TYPE_ERROR)
bad_vals = [type(v) for k,v in permission_data['permission'].items() if (type(v)!=bool)]
if len(bad_vals) > 0:
current_app.logger.error('Wrong values passed for permissions for POST: {0} [{1}]'
.format(request.data, bad_vals))
return err(WRONG_TYPE_ERROR)
current_app.logger.info('Requested permission changes for user {0}:'
' {1} for library {2}, by user: {3}'
.format(permission_data['email'],
permission_data,
library_uuid,
user_editing_uid)
)
try:
secondary_user = self.helper_email_to_api_uid(permission_data)
current_app.logger.info('User: {0} corresponds to: {1}'
.format(permission_data['email'],
secondary_user))
except NoResultFound:
return err(API_MISSING_USER_EMAIL)
secondary_service_uid = \
self.helper_absolute_uid_to_service_uid(
absolute_uid=secondary_user)
current_app.logger.info('User: {0} is internally: {1}'
.format(secondary_user, secondary_service_uid))
current_app.logger.info('Modifying permissions STARTING....')
if not self.has_permission(service_uid_editor=user_editing_uid,
service_uid_modify=secondary_service_uid,
library_id=library_uuid):
current_app.logger.error(
'User: {0} does not have permissions to edit: {1}'
.format(user_editing_uid, library_uuid)
)
return err(NO_PERMISSION_ERROR)
try:
self.add_permission(service_uid=secondary_service_uid,
library_id=library_uuid,
permission=permission_data['permission'])
except PermissionDeniedError:
current_app.logger.error('User: {0} does not have permissions to '
'modify the value of: {1}'
.format(user_editing_uid,
permission_data['permission']))
return err(NO_PERMISSION_ERROR)
current_app.logger.info('...SUCCESS.')
name = self.helper_library_name(library_uuid)
try:
payload_plain, payload_html = self.format_permission_payload(library_name=name,
library_id=library,
permission_data=permission_data)
except (RuntimeError, ValueError) as e:
current_app.logger.warning('Error building payload for permission data {0}, library {1}. ' +
'Error message: {2}. Not sending email to {3}'.
format(permission_data, name, e, permission_data['email']))
payload_plain = None
if payload_plain:
current_app.logger.info('Sending email to {0} with payload: {1}'.format(permission_data['email'], payload_plain))
try:
msg = self.send_email(email_addr=permission_data['email'],
payload_plain=payload_plain,
payload_html=payload_html,
email_template=PermissionsChangedEmail)
except:
current_app.logger.warning('Sending email to {0} failed'.format(permission_data['email']))
return {}, 200