datanator/data_source/uniprot_nosql.py
'''
Generates uniprot_swiss (reviewed) NoSQL documents
load documents into MongoDB collections
:Author: Zhouyang Lian <zhouyang.lian@familian.life>
:Author: Jonathan <jonrkarr@gmail.com>
:Date: 2019-04-02
:Copyright: 2019, Karr Lab
:License: MIT
'''
import io
import math
import json
import pandas
import requests
import pymongo.errors
from datanator_query_python.util import mongo_util
from pymongo.collation import Collation, CollationStrength
from datanator_query_python.query import query_taxon_tree, query_kegg_orthology, query_sabiork
import datanator.config.core
class UniprotNoSQL(mongo_util.MongoUtil):
def __init__(self, MongoDB=None, db=None, max_entries=float('inf'), verbose=False,
username=None, password=None, authSource='admin', replicaSet=None, collection_str='uniprot'):
self.url = 'http://www.uniprot.org/uniprot/?fil=reviewed:yes'
self.query_url = 'https://www.uniprot.org/uniprot/?query='
self.MongoDB = MongoDB
self.db = db
self.max_entries = max_entries
self.collection_str = collection_str
super(UniprotNoSQL, self).__init__(MongoDB=MongoDB, db=db, username=username,
password=password, authSource=authSource, replicaSet=replicaSet,
verbose=verbose, max_entries=max_entries)
self.taxon_manager = query_taxon_tree.QueryTaxonTree(username=username, MongoDB=MongoDB, password=password,
authSource=authSource)
self.ko_manager = query_kegg_orthology.QueryKO(username=username, password=password, server=MongoDB,
authSource=authSource, verbose=verbose, max_entries=max_entries)
self.sabio_manager = query_sabiork.QuerySabio(MongoDB=MongoDB, username=username, password=password,
authSource=authSource)
self.collation = Collation(locale='en', strength=CollationStrength.SECONDARY)
self.collection = self.db_obj[collection_str]
self.verbose = verbose
# build dataframe for uniprot_swiss for loading into mongodb
def load_uniprot(self, query=False, msg='', species=None):
"""Build dataframe
Args:
query (:obj:`bool`, optional): Whether download all reviewed entries of perform individual queries. Defaults to False.
msg (:obj:`str`, optional): Query message. Defaults to ''.
species (:obj:`list`, optional): species information to extract from df and loaded into uniprot. Defaults to None.
"""
fields = '&columns=id,entry name,genes(PREFERRED),protein names,sequence,length,mass,ec,database(GeneID),reviewed,organism-id,database(KO),genes(ALTERNATIVE),genes(ORF),genes(OLN),database(EMBL),database(RefSeq),database(KEGG)'
if not query:
url = self.url + fields
else:
query_msg = msg
if isinstance(species, list):
for specie in species:
query_msg += '+'+str(specie)
url = self.query_url + query_msg + '&sort=score' + fields
url += '&format=tab'
url += '&compress=no'
if not math.isnan(self.max_entries):
url += '&limit={}'.format(self.max_entries)
try:
response = requests.get(url, stream=False)
response.raise_for_status()
except requests.exceptions.ConnectionError:
pass
try:
data = pandas.read_csv(io.BytesIO(response.content), delimiter='\t', encoding='utf-8', low_memory=False)
except pandas.errors.EmptyDataError:
return
except UnboundLocalError:
return
data.columns = [
'uniprot_id', 'entry_name', 'gene_name', 'protein_name', 'canonical_sequence', 'length', 'mass',
'ec_number', 'entrez_id', 'status', 'ncbi_taxonomy_id', 'ko_number', 'gene_name_alt',
'gene_name_orf', 'gene_name_oln', 'sequence_embl', 'sequence_refseq', 'kegg_org_gene'
]
data['entrez_id'] = data['entrez_id'].astype(str).str.replace(';', '')
data['kegg_org_gene'] = data['kegg_org_gene'].astype(str).str.replace(';', '')
try:
data['mass'] = data['mass'].str.replace(',', '')
except AttributeError:
pass
data['ko_number'] = data['ko_number'].astype(str).str.replace(';', '')
data['gene_name_oln'] = data['gene_name_oln'].astype(str).str.split(' ')
data['gene_name_orf'] = data['gene_name_orf'].astype(str).str.split(' ')
data['gene_name_alt'] = data['gene_name_alt'].astype(str).str.split(' ')
data['sequence_embl'] = self.embl_helper(data['sequence_embl'].astype(str).str)
data['sequence_refseq'] = self.embl_helper(data['sequence_refseq'].astype(str).str)
if species is None:
self.load_df(data)
else:
self.load_df(data.loc[data['ncbi_taxonomy_id'].isin(species)])
def embl_helper(self, s):
"""Processing emble or refseq strings into a list of standard
format. "NP_796298.2 [E9PXF8-1];XP_006507989.1 [E9PXF8-2];" ->
['NP_796298.2', 'XP_006507989.1'].
Args:
s (:obj:`pandas.Dataframe`): object to be processed.
Return:
(:obj:`list` of :obj:`str`): list of processed strings
"""
return s.replace('[', '').str.replace(';',' ').str.replace(']', '').str.split(' ').str[:-1]
# load pandas.DataFrame into MongoDB
def load_df(self, df):
df_json = json.loads(df.to_json(orient='records'))
try:
self.collection.insert(df_json)
except pymongo.errors.InvalidOperation as e:
return(str(e))
def fill_species_name(self):
ncbi_taxon_ids = self.collection.distinct('ncbi_taxonomy_id')
start = 0
for i, _id in enumerate(ncbi_taxon_ids[start:]):
if i == self.max_entries:
break
if i% 50 == 0 and self.verbose:
print('Adding taxon name to {} out of {} records.'.format(i + start, len(ncbi_taxon_ids)))
names = self.taxon_manager.get_name_by_id([_id])
self.collection.update_many({'ncbi_taxonomy_id': _id},
{'$set': {'species_name': names.get(_id)}},
upsert=False)
def fill_ko_name(self):
ko_numbers = self.collection.distinct('ko_number')
count = len(ko_numbers)
start = 0
for i, number in enumerate(ko_numbers[start:]):
if i == self.max_entries:
break
if i % 50 == 0 and self.verbose:
print('Adding ko name to {} out of {} records.'.format(i + start, count))
kegg_name = self.ko_manager.get_def_by_kegg_id(number)
self.collection.update_many({'ko_number': number},
{'$set': {'ko_name': kegg_name}},upsert=False)
def fill_species_info(self):
"""Fill ancestor information.
"""
ncbi_taxon_ids = self.collection.distinct('ncbi_taxonomy_id')
start = 0
count = len(ncbi_taxon_ids)
for i, _id in enumerate(ncbi_taxon_ids[start:]):
if i == self.max_entries:
break
if i% 50 == 0 and self.verbose:
print('Adding ancestor information to {} out of {} records.'.format(i + start, count))
ancestor_taxon_id, ancestor_name = self.taxon_manager.get_anc_by_id([_id])
self.collection.update_many({'ncbi_taxonomy_id': _id},
{'$set': {'ancestor_taxon_id': ancestor_taxon_id[0],
'ancestor_name': ancestor_name[0]}},
upsert=False)
def load_abundance_from_pax(self):
'''Load protein abundance data but interating from pax collection.
'''
_, _, col_pax = self.con_db('pax')
query = {}
projection = {'ncbi_id': 1, 'species_name': 1,
'observation': 1, 'organ': 1}
docs = col_pax.find(filter=query, projection=projection, batch_size=5)
count = col_pax.count_documents(query)
progress = 285
for i, doc in enumerate(docs[progress:]):
organ = doc['organ']
if self.verbose and i % 1 == 0:
print('Loading abundance info {} of {} ...'.format(
i + progress, min(count, self.max_entries)))
for j, obs in enumerate(doc['observation']):
if j == self.max_entries:
break
if self.verbose and j % 100 == 0 and i % 1 == 0:
print(' Loading observation info {} of {} ...'.format(
j, len(doc['observation'])))
try:
uniprot_id = obs['protein_id']['uniprot_id']
abundance = obs['abundance']
dic = {'organ': organ, 'abundance': abundance}
self.collection.update_one({'uniprot_id': uniprot_id},
{'$push': {'abundances': dic} }, upsert=False,
collation=self.collation)
except TypeError:
continue
def remove_redudant_id(self):
"""Remove redundant entries in uniprot collection.
Priority:
1. Has 'abundances' field
1. Has 'ko_name' field
2. Has 'kegg_org_gene' field
3. Has 'orthologs' field
"""
ids = self.collection.distinct('uniprot_id', collation=self.collation)
projection = {'abundances': 1, 'ko_name': 1, 'kegg_org_gene': 1, 'orthologs': 1}
for _id in ids:
query = {'uniprot_id': _id}
count = self.collection.count_documents(query, projection=projection, collation=self.collation)
if count == 1:
continue
else:
docs = self.collection.find(filter=query, projection=projection, collation=self.collation)
to_be_removed = []
to_be_saved = []
for doc in docs:
pass
def fill_reactions(self, start=0):
"""Fill reactions in which the protein acts as a catalyst.
Args:
start (:obj:`int`, optional): Starting document in sabio_rk. Defaults to 0.
"""
docs = self.sabio_manager.collection.find({}, projection={'enzyme': 1, 'kinlaw_id': 1})
count = self.sabio_manager.collection.count_documents({})
for i, doc in enumerate(docs[start:]):
if i == self.max_entries:
break
if i % 100 == 0 and self.verbose:
print("Processing reaction {} out of {}...".format(i+start, count))
enzyme = doc['enzyme']
kinlaw_id = doc['kinlaw_id']
if len(enzyme) == 0 or enzyme is None or enzyme[0].get('subunits') is None:
continue
else:
enzyme_subunits = enzyme[0]['subunits']
for subunit in enzyme_subunits:
if subunit is None:
continue
else:
uniprot_id = subunit['uniprot']
if i % 100 == 0 and self.verbose:
print(" Adding kinlaw_id {} into uniprot collection.".format(kinlaw_id))
self.collection.update_one({'uniprot_id': uniprot_id},
{'$addToSet': {'sabio_kinlaw_id': kinlaw_id}},
collation=self.collation, upsert=False)
def fill_abundance_publication(self, start=0):
"""(https://github.com/KarrLab/datanator/issues/51)
Args:
start(:obj:`int`, optional): beginning of documents.
"""
query = {'abundances.file_name': {"$exists": False}}
projection = {'abundances': 1, 'ncbi_taxonomy_id': 1, "uniprot_id": 1}
count = self.collection.count_documents(query)
docs = self.collection.find(filter=query, projection=projection,
no_cursor_timeout=True, batch_size=20,
skip=start, collation=self.collation)
pax_collection = self.client['datanator']['pax']
for i, doc in enumerate(docs):
if i == self.max_entries:
break
if i % 20 == 0 and self.verbose:
print("Processing doc {} out of {}...".format(i+start, count))
uniprot_id = doc['uniprot_id']
abundances = doc['abundances']
taxon = doc['ncbi_taxonomy_id']
for j, abundance in enumerate(abundances): #{organ: xxx, abundance: xxx}
if abundance.get("file_name") is not None:
continue
value = abundance['abundance']
con_0 = {'ncbi_id': taxon}
con_1 = {'organ': abundance['organ']}
con_2 = {'observation': {"$elemMatch": {"protein_id.uniprot_id": uniprot_id, "abundance": value}}}
q_pax = {"$and": [con_0, con_1, con_2]}
p_doc = pax_collection.find_one(filter=q_pax, projection={"file_name": 1, "publication": 1})
if p_doc is not None:
abundance['file_name'] = p_doc['file_name']
abundance['publication'] = p_doc['publication']
else:
print("No pax doc found for uniprot doc {}, abundance entry {}; skip is at {}".format(doc["_id"], j, i))
break
self.collection.update_many({"uniprot_id": doc["uniprot_id"]},
{"$set": {"abundances": abundances}},
collation=self.collation)
def fill_species_name_new(self, start=0):
"""Some documents don't have species_name filed.
"""
query = {"species_name": {"$exists": False}}
count = self.collection.count_documents(query)
taxon_col = self.client['datanator']['taxon_tree']
docs = self.collection.find(filter=query, projection={"ncbi_taxonomy_id": 1},
no_cursor_timeout=True, batch_size=100)
for i, doc in enumerate(docs):
if i == self.max_entries:
break
if i % 50 == 0 and self.verbose:
print("Processing doc {} out of {}...".format(i+start, count))
tax_id = doc["ncbi_taxonomy_id"]
species_name = taxon_col.find_one(filter={"tax_id": tax_id}, projection={"tax_name": 1})["tax_name"]
self.collection.update_one({"_id": doc["_id"]},
{"$set": {"species_name": species_name}},
upsert=False)
from multiprocessing import Pool, Process
def main():
db = 'datanator'
collection_str = 'uniprot'
username = datanator.config.core.get_config()[
'datanator']['mongodb']['user']
password = datanator.config.core.get_config(
)['datanator']['mongodb']['password']
server = datanator.config.core.get_config(
)['datanator']['mongodb']['server']
manager=UniprotNoSQL(MongoDB=server, db=db,
username=username, password=password, collection_str=collection_str,
verbose=True)
# manager.load_uniprot()
p = Process(target=manager.fill_species_name())
p.start()
p.join()
# p = Process(target=manager.fill_ko_name())
# p.start()
# p.join()
# p = Process(target=manager.fill_species_info())
# p.start()
# p.join()
# p = Process(target=manager.load_abundance_from_pax())
# p.start()
# p.join()
# p = Process(target=manager.fill_reactions())
# p.start()
# p.join()
# manager.fill_abundance_publication()
# p = Process(target=manager.fill_species_name_new())
# p.start()
# p.join()
if __name__ == '__main__':
main()