KarrLab/datanator

View on GitHub
datanator/schema_2/migrate_corum.py

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
from datanator_query_python.config import motor_client_manager, config
import simplejson as json
import asyncio
from pymongo import UpdateOne
from pymongo.errors import BulkWriteError
from pprint import pprint
import os


class MigrateCorum:

    def __init__(self, collection="corum", to_database="datanator-test",
                 from_database="datanator", max_entries=float("inf")):
        self.collection = collection
        self.from_database = from_database
        self.to_database = to_database
        self.from_collection = motor_client_manager.client.get_database(from_database)[collection]
        self.to_collection = motor_client_manager.client.get_database(to_database)[collection]
        self.max_entries = max_entries

    async def index_primary(self, _key, background=True):
        """Index key (single key ascending)

        Args:
            _key(:obj:`str`): Name of key to be indexed
        """
        yield self.to_collection.create_index(_key, background=background)

    async def process_cursor(self, skip=0):
        """Transform data and move to new database

        Args:
            docs(:obj:`pymongo.Cursor`): documents to be processed
        """
        bulk_write = []
        query = {}
        if self.max_entries == float('inf'):
            limit = 0
        else:
            limit = self.max_entries
        docs = self.from_collection.find(filter=query, projection={'_id': 0},
                                        no_cursor_timeout=True, batch_size=10,
                                        skip=skip, limit=limit)
        i = 0
        async for doc in docs:
            i += 1
            if i == self.max_entries:
                break
            if i != 0 and i % 50 == 0:
                print("Processing file {}".format(i + skip))
                try:
                    self.to_collection.bulk_write(bulk_write)
                    bulk_write = []
                except BulkWriteError as bwe:
                    pprint(bwe.details)
                    bulk_write = []
            doc.pop("complex_id")
            doc["ncbi_taxonomy_id"] = doc["SWISSPROT_organism_NCBI_ID"]
            doc.pop("SWISSPROT_organism_NCBI_ID")
            doc["schema_version"] = "2"
            bulk_write.append(UpdateOne({'ComplexID': doc.get("ComplexID")}, {'$set': json.loads(json.dumps(doc, ignore_nan=True))}, upsert=True))
        if len(bulk_write) != 0:
            try:
                self.to_collection.bulk_write(bulk_write)
            except BulkWriteError as bwe:
                pprint(bwe.details)
            finally:
                print("Done.")  


def main():
    loop = asyncio.get_event_loop()
    src = MigrateCorum()
    src.index_primary('ComplexID')
    loop.run_until_complete(src.process_cursor(skip=0))

if __name__ == '__main__':
    main()