cosmos/dataservice/objectservice.py
"""
Copyright (C) 2014 Maruf Maniruzzaman
Website: http://cosmosframework.com
Author: Maruf Maniruzzaman
License :: OSI Approved :: MIT License
"""
from argparse import ArgumentError
import hashlib
import datetime
import gridfs
import motor
import tornado.web
from bson import ObjectId
from cosmos.rbac.service import *
#TODO: May be use observers to check access - initialize in the __init__ of the rbac package
ACCESS_TYPE_ROLE = 1
ACCESS_TYPE_OWNER_ONLY = 2
class ObjectService():
def __init__(self, *args, **kwargs):
self.rbac_service = kwargs.get("rbac_service", RbacService())
self.db = kwargs.get("db", None)
if not self.db:
raise ArgumentError("db", "Database object must be set while creating ObjectService.")
self._preprocessors = {}
self._processors = {}
self._postprocessors = {}
def check_access(self, user, object_name, properties, access, check_owner=False):
roles = self.rbac_service.get_roles(user)
# We must check all roles for possible access before checking owner access, so we need to loop twice.
# Since owner access is suggested rare optimizing for role access here,
for role in roles:
has_access = self.rbac_service.has_access(role, object_name, properties, access)
if has_access:
logging.debug("ObjectService:: check _access {0} is granted to {1} as role accessible for properties {2}.".format(object_name, user, properties))
return ACCESS_TYPE_ROLE
for role in roles:
if check_owner:
has_owner_access = self.rbac_service.has_owner_access(role, object_name, properties, access)
if has_owner_access:
logging.debug("ObjectService:: check _access {0} is granted to {1} as owner accessible for properties {2}.".format(object_name, user, properties))
return ACCESS_TYPE_OWNER_ONLY
logging.warn("ObjectService:: check _access {0} is DENIED to {1} for properties {2}.".format(object_name, user, properties))
raise tornado.web.HTTPError(401, "Unauthorized")
def save(self, user, object_name, data):
logging.debug("ObjectService::save::{0}".format(object_name))
assert isinstance(data, dict)
properties = self.get_properties(data)
self.check_access(user, object_name, properties, AccessType.INSERT, True)
self.create_access_log(user, object_name, AccessType.INSERT)
data['createtime'] = str(datetime.datetime.now())
if user:
data['owner'] = str(user.get("_id"))
else:
data['owner'] = SYSTEM_USER
if data.get("_id"):
data["_id"] = ObjectId(str(data["_id"]))
#TODO: Make sure user can not insert data for object owned by other user. With _id it may be possible.
result = self.db[object_name].save(data)
return result
def insert(self, user, object_name, data):
logging.debug("ObjectService::insert::{0}".format(object_name))
assert isinstance(data, dict)
properties = self.get_properties(data)
self.check_access(user, object_name, properties, AccessType.INSERT, True)
self.create_access_log( user, object_name, AccessType.INSERT)
data['createtime'] = str(datetime.datetime.now())
if user:
data['owner'] = str(user.get("_id"))
else:
data['owner'] = SYSTEM_USER
result = self.db[object_name].insert(data)
return result
def find(self, user, object_name, query, columns, limit=5000):
logging.debug("ObjectService::find::{0}".format(object_name))
#assert inspect.ismethod(callback)
allowed_access_type = self.check_access(user, object_name, columns, AccessType.READ, True)
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
if query:
assert isinstance(query, dict)
query["owner"] = str(user.get("_id"))
else:
query = {"owner": str(user.get("_id"))}
self.create_access_log(user, object_name, AccessType.READ)
if len(columns) > 0:
columns_dict = {column:1 for column in columns}
else:
columns_dict = None
result = self.db[object_name].find(query, columns_dict).limit(limit)
return result
def text_search(self, user, object_name, query, columns, limit=5000):
logging.debug("ObjectService::text_search::{0}".format(object_name))
allowed_access_type = self.check_access(user, object_name, columns, AccessType.SEARCH, True)
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
if query:
assert isinstance(query, dict)
query["owner"] = str(user.get("_id"))
else:
query = {"owner": str(user.get("_id"))}
self.create_access_log(user, object_name, AccessType.SEARCH)
if len(columns) > 0:
columns_dict = {column:1 for column in columns}
else:
columns_dict = None
result = self.db[object_name].find(query, columns_dict).limit(limit)
return result
def load(self, user, object_name, id, columns):
logging.debug("ObjectService::load::{0}".format(object_name))
allowed_access_type = self.check_access(user, object_name, columns, AccessType.READ, True)
if len(columns) > 0:
columns_dict = {column:1 for column in columns}
else:
columns_dict = None
query = {'_id': ObjectId(id)}
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
query["owner"] = str(user.get("_id"))
self.create_access_log(user, object_name, AccessType.READ)
result = self.db[object_name].find_one(query, columns_dict)
return result
# TODO: SECURITY_ISSUE: escape $ sign in values
# http://docs.mongodb.org/manual/faq/developers/#dollar-sign-operator-escaping
def update(self, user, object_name, id, data):
logging.debug("ObjectService::update::{0}".format(object_name))
logging.debug("Data: {0}".format(data))
assert len(id) > 0
assert isinstance(data, dict)
properties = self.get_properties(data)
allowed_access_type = self.check_access(user, object_name, properties, AccessType.UPDATE, True)
query = {'_id': ObjectId(id)}
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
query["owner"] = str(user.get("_id"))
self.create_access_log(user, object_name, AccessType.UPDATE)
result = self.db[object_name].update(query, {'$set': data})
return result
def delete(self, user, object_name, id):
logging.debug("ObjectService::delete::{0}".format(object_name))
assert len(id) > 0
allowed_access_type = self.check_access(user, object_name, [], AccessType.DELETE, True)
query = {'_id': ObjectId(id)}
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
query["owner"] = str(user.get("_id"))
self.create_access_log(user, object_name, AccessType.DELETE)
result = self.db[object_name].remove(query)
return result
def save_file(self, user, collection_name, file_object, file_id=None):
logging.debug("ObjectService::save_file::{0}".format(collection_name))
properties = ['body', 'content_type', 'filename', 'collection_name', 'createtime', 'owner']
if file_id:
self.check_access(user, collection_name, properties, AccessType.UPDATE, True)
else:
self.check_access(user, collection_name, properties, AccessType.INSERT, True)
if file_id:
self.create_access_log(user, collection_name, AccessType.UPDATE)
else:
self.create_access_log(user, collection_name, AccessType.INSERT)
return save_file_in_gridfs(self.db, user, collection_name, file_object, properties, file_id)
def load_file(self, user, collection_name, file_id, ignore_col_name=False):
logging.debug("ObjectService::load_file::{0}".format(collection_name))
properties = ['body', 'content_type']
if not ignore_col_name:
allowed_access_type = self.check_access(user, collection_name, properties, AccessType.READ, True)
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
return read_gridfs_owned_file(user, self.db, collection_name, file_id, properties)
self.create_access_log(user, collection_name, AccessType.READ)
else:
self.create_access_log(user, "gridfile:"+file_id, AccessType.READ)
return read_gridfs_file(self.db, collection_name, file_id, properties)
@gen.coroutine
def list_file(self, user, collection_name):
logging.debug("ObjectService::save_file::{0}".format(collection_name))
properties = ['body', 'content_type', 'filename', 'collection_name', 'createtime', 'owner', 'md5', 'length']
allowed_access_type = self.check_access(user, collection_name, properties, AccessType.READ, True)
query = {'collection_name': collection_name}
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
query["owner"] = str(user.get("_id"))
file_list = []
fs = motor.MotorGridFS(self.db)
cursor = fs.find(query, timeout=False)
while (yield cursor.fetch_next):
grid_out = cursor.next_object()
file_list.append({"file_id": grid_out._id, "filename":grid_out.filename,
'content_type':grid_out.content_type, 'owner':grid_out.owner, 'md5':grid_out.md5,
'createtime':grid_out.createtime, "length": grid_out.length,
"collection_name":collection_name})
grid_out.close()
raise gen.Return(file_list)
def delete_file(self, user, collection_name, file_id):
logging.debug("ObjectService::delete_file::{0}".format(collection_name))
assert len(file_id) > 0
allowed_access_type = self.check_access(user, collection_name, [], AccessType.DELETE, True)
self.create_access_log(user, collection_name, AccessType.DELETE)
if allowed_access_type == ACCESS_TYPE_OWNER_ONLY:
return delete_gridfs_owned_file(user, self.db, file_id)
return delete_gridfs_file(self.db, file_id)
def get_properties(self, data, namespace=None):
properties = list(data.keys())
child_props = []
for prop in properties:
prop_data = data.get(prop, None)
if prop_data and isinstance(prop_data, dict):
child_namespace = (namespace + "." + prop) if namespace else prop
child_props = child_props + self.get_properties(prop_data, child_namespace)
if child_props:
properties = properties + child_props
properties = [p if not namespace else namespace + "." + p for p in properties]
return properties
def create_access_log(self, user, module, function):
username = None
access_log = {"username":username, 'module': module, 'function': function}
access_log['createtime'] = str(datetime.datetime.now())
#db.audit.access_log.insert(access_log, callback=None)
def add_operation_preprocessor(self, preprocessor, object_name, access_types):
assert isinstance(access_types, collections.Iterable)
assert len(object_name) > 0
object_preprocessor =self._preprocessors.get(object_name)
if not object_preprocessor:
object_preprocessor = {}
self._preprocessors[object_name] = object_preprocessor
for access_type in access_types:
object_acctyp_preprocessor = object_preprocessor.get(access_type)
if not object_acctyp_preprocessor:
object_acctyp_preprocessor = []
object_preprocessor[access_type] = object_acctyp_preprocessor
if not preprocessor in object_acctyp_preprocessor:
object_acctyp_preprocessor.append(preprocessor)
def add_operation_processor(self, processor, object_name, access_types):
assert isinstance(access_types, collections.Iterable)
assert len(object_name) > 0
object_processor =self._processors.get(object_name)
if not object_processor:
object_processor = {}
self._processors[object_name] = object_processor
for access_type in access_types:
object_acctyp_processor = object_processor.get(access_type)
if not object_acctyp_processor:
object_acctyp_processor = []
object_processor[access_type] = object_acctyp_processor
if not processor in object_acctyp_processor:
object_acctyp_processor.append(processor)
def add_operation_postprocessor(self, postprocessor, object_name, access_types):
#assert inspect.ismethod(preprocessor)
assert isinstance(access_types, collections.Iterable)
assert len(object_name) > 0
object_postprocessor =self._postprocessors.get(object_name)
if not object_postprocessor:
object_postprocessor = {}
self._postprocessors[object_name] = object_postprocessor
for access_type in access_types:
object_acctype_postprocessor = object_postprocessor.get(access_type)
if not object_acctype_postprocessor:
object_acctype_postprocessor = []
object_postprocessor[access_type] = object_acctype_postprocessor
if not postprocessor in object_acctype_postprocessor:
object_acctype_postprocessor.append(postprocessor)
def get_operation_preprocessor(self, object_name, access_type):
assert len(object_name) > 0
object_preprocessor = self._preprocessors.get(object_name)
if not object_preprocessor:
return []
preprocessor_list = object_preprocessor.get(access_type)
if not preprocessor_list:
return []
return preprocessor_list
def get_operation_processor(self, object_name, access_type):
assert len(object_name) > 0
object_processor = self._processors.get(object_name)
if not object_processor:
return []
processor_list = object_processor.get(access_type)
if not processor_list:
return []
return processor_list
def get_operation_postprocessor(self, object_name, access_type):
assert len(object_name) > 0
object_postprocessor = self._postprocessors.get(object_name)
if not object_postprocessor:
return []
post_processor_list = object_postprocessor.get(access_type)
if not post_processor_list:
return []
return post_processor_list
@gen.coroutine
def save_file_in_gridfs(db, user, collection_name, file_object, properties, file_id=None):
fs = motor.MotorGridFS(db)
if file_id:
#TODO: Verify: As of today mongodb does not allow replacing file- so delete and create
#TODO: non atomic operation
yield delete_gridfs_file(db, file_id)
oid = ObjectId(str(file_id))
gridin = yield fs.new_file(_id=oid)
else:
gridin = yield fs.new_file()
file_body = file_object.get("body")
length = len(file_body)
md5_dig = hashlib.md5(file_body).hexdigest()
result = yield gridin.write(file_body)
# TODO: we can write another chunk- as many times we want-
# When support for streaming file comes in mainstram tornado branch
# we should use that
if 'content_type' in properties:
yield gridin.set('content_type', file_object.get("content_type"))
yield gridin.set('filename', file_object.get("filename"))
yield gridin.set('collection_name', collection_name)
current_time = str(datetime.datetime.now())
yield gridin.set('createtime', current_time)
yield gridin.set('owner',str(user.get("_id")))
yield gridin.close()
file_id = gridin._id
filename = gridin.filename
result = {"md5":md5_dig, "file_id":str(file_id),"length": length, 'owner': str(user.get("_id")),
"collection_name":collection_name, "filename": filename,'createtime': current_time }
raise gen.Return(result)
@gen.coroutine
def read_gridfs_owned_file(user, db, collection_name, file_id, properties):
owner_id = user.get("_id")
if len(owner_id) < 1:
raise tornado.web.HTTPError(401, "Unauthorized")
fs = motor.MotorGridFS(db)
try:
gridout = yield fs.get(ObjectId(file_id))
if not gridout:
raise tornado.web.HTTPError(404, "File not found")
owner = gridout.owner
if owner != owner_id:
raise tornado.web.HTTPError(401, "Unauthorized")
content_type = gridout.content_type
got_col_name = gridout.collection_name
if got_col_name != collection_name:
raise tornado.web.HTTPError(404, "File not found")
content = yield gridout.read()
data = {"body": content }
if "content_type" in properties:
data["content_type"] = content_type
raise gen.Return(data)
except gridfs.NoFile:
raise tornado.web.HTTPError(404, "File not found")
@gen.coroutine
def read_gridfs_file(db, collection_name, file_id, properties, ignore_col_name=False):
fs = motor.MotorGridFS(db)
try:
gridout = yield fs.get(ObjectId(file_id))
if not gridout:
raise tornado.web.HTTPError(404, "File not found")
content_type = gridout.content_type
if not ignore_col_name:
got_col_name = gridout.collection_name
if got_col_name != collection_name:
raise tornado.web.HTTPError(404, "File not found")
content = yield gridout.read()
data = {"body": content }
if "content_type" in properties:
data["content_type"] = content_type
raise gen.Return(data)
except gridfs.NoFile:
raise tornado.web.HTTPError(404, "File not found")
@gen.coroutine
def delete_gridfs_file(db, file_id):
fs = motor.MotorGridFS(db)
result = yield fs.delete(ObjectId(file_id))
raise gen.Return(result)
@gen.coroutine
def delete_gridfs_owned_file(user, db, file_id):
owner_id = user.get("_id")
if len(owner_id) < 1:
raise tornado.web.HTTPError(401, "Unauthorized")
fs = motor.MotorGridFS(db)
gridout = yield fs.get(ObjectId(file_id))
if not gridout:
raise tornado.web.HTTPError(404, "File not found")
owner = gridout.owner
gridout.close()
if owner == owner_id:
result = yield fs.delete(ObjectId(file_id))
raise gen.Return(result)
else:
raise tornado.web.HTTPError(401, "Unauthorized")