scitran/core

View on GitHub
bin/database.py

Summary

Maintainability
F
2 wks
Test Coverage
#!/usr/bin/env python

import argparse
import bson
import copy
import datetime
import dateutil.parser
import json
import logging
import multiprocessing
import os
import re
import sys
import time

from api import config
from api import util
from api.dao import containerutil
from api.dao.containerstorage import ProjectStorage
from api.jobs.jobs import Job
from api.jobs import gears
from api.types import Origin
from api.jobs import batch

CURRENT_DATABASE_VERSION = 44 # An int that is bumped when a new schema change is made

def get_db_version():

    version = config.get_version()
    if version is None:
        # Attempt to find db version at old location
        version = config.db.version.find_one({'_id': 'version'})
    if version is None or version.get('database') is None:
        return 0
    return version.get('database')

def confirm_schema_match():
    """
    Checks version of database schema

    Returns (0)  if DB schema version matches requirements.
    Returns (42) if DB schema version does not match
                 requirements and can be upgraded.
    Returns (43) if DB schema version does not match
                 requirements and cannot be upgraded,
                 perhaps because code is at lower version
                 than the DB schema version.
    """

    db_version = get_db_version()
    if not isinstance(db_version, int) or db_version > CURRENT_DATABASE_VERSION:
        logging.error('The stored db schema version of %s is incompatible with required version %s',
                       str(db_version), CURRENT_DATABASE_VERSION)
        sys.exit(43)
    elif db_version < CURRENT_DATABASE_VERSION:
        sys.exit(42)
    else:
        sys.exit(0)

def getMonotonicTime():
    # http://stackoverflow.com/a/7424304
    return os.times()[4]

def process_cursor(cursor, closure, context = None):
    """
    Given an iterable (say, a mongo cursor) and a closure, call that closure in parallel over the iterable.
    Call order is undefined. Currently launches N python process workers, where N is the number of vcpu cores.

    Useful for upgrades that need to touch each document in a database, and don't need an iteration order.

    Your closure MUST return True on success. Anything else is logged and treated as a failure.
    A closure that throws an exception will fail the upgrade immediately.
    """

    begin = getMonotonicTime()

    # cores = multiprocessing.cpu_count()
    # pool = multiprocessing.Pool(cores)
    # logging.info('Iterating over cursor with ' + str(cores) + ' workers')

    # # Launch all work, iterating over the cursor
    # # Note that this creates an array of n multiprocessing.pool.AsyncResults, where N is table size.
    # # Memory usage concern in the future? Doesn't seem to be an issue with ~120K records.
    # # Could be upgraded later with some yield trickery.
    # results = [pool.apply_async(closure, (document,)) for document in cursor]

    # # Read the results back, presumably in order!
    # failed = False
    # for res in results:
    #     result = res.get()
    #     if result != True:
    #         failed = True
    #         logging.info('Upgrade failed: ' + str(result))

    # logging.info('Waiting for workers to complete')
    # pool.close()
    # pool.join()

    logging.info('Proccessing {} items in cursor ...'.format(cursor.count()))

    failed = False
    cursor_size = cursor.count()
    cursor_index = 0.0
    next_percent = 5.0
    percent_increment = 5
    if(cursor_size < 20):
        next_percent = 25.0
        percent_increment = 25
    if(cursor_size < 4):
        next_percent = 50.0
        percent_increment = 50
    for document in cursor:
        if 100 * (cursor_index / cursor_size) >= next_percent:
            logging.info('{} percent complete ...'.format(next_percent))
            next_percent = next_percent + percent_increment
        if context == None:
            result = closure(document)
        else:
            result = closure(document, context)
        cursor_index = cursor_index + 1
        if result != True:
            failed = True
            logging.info('Upgrade failed: ' + str(result))

    if failed is True:
        msg = 'Worker pool experienced one or more failures. See above logs.'
        logging.info(msg)
        raise Exception(msg)

    end = getMonotonicTime()
    elapsed = end - begin
    logging.info('Parallel cursor iteration took ' + ('%.2f' % elapsed))

def upgrade_to_1():
    """
    scitran/core issue #206

    Initialize db version to 1
    """
    config.db.singletons.insert_one({'_id': 'version', 'database': 1})

def upgrade_to_2():
    """
    scitran/core PR #236

    Set file.origin.name to id if does not exist
    Set file.origin.method to '' if does not exist
    """

    def update_file_origins(cont_list, cont_name):
        for container in cont_list:
            updated_files = []
            for file in container.get('files', []):
                origin = file.get('origin')
                if origin is not None:
                    if origin.get('name', None) is None:
                        file['origin']['name'] = origin['id']
                    if origin.get('method', None) is None:
                        file['origin']['method'] = ''
                updated_files.append(file)

            query = {'_id': container['_id']}
            update = {'$set': {'files': updated_files}}
            result = config.db[cont_name].update_one(query, update)

    query = {'$and':[{'files.origin.name': { '$exists': False}}, {'files.origin.id': { '$exists': True}}]}

    update_file_origins(config.db.collections.find(query), 'collections')
    update_file_origins(config.db.projects.find(query), 'projects')
    update_file_origins(config.db.sessions.find(query), 'sessions')
    update_file_origins(config.db.acquisitions.find(query), 'acquisitions')

def upgrade_to_3():
    """
    scitran/core issue #253

    Set first user with admin permissions found as curator if one does not exist
    """
    query = {'curator': {'$exists': False}, 'permissions.access': 'admin'}
    projection = {'permissions.$':1}
    collections = config.db.collections.find(query, projection)
    for coll in collections:
        admin = coll['permissions'][0]['_id']
        query = {'_id': coll['_id']}
        update = {'$set': {'curator': admin}}
        config.db.collections.update_one(query, update)

def upgrade_to_4():
    """
    scitran/core issue #263

    Add '_id' field to session.subject
    Give subjects with the same code and project the same _id
    """

    pipeline = [
        {'$match': { 'subject._id': {'$exists': False}}},
        {'$group' : { '_id' : {'pid': '$project', 'code': '$subject.code'}, 'sids': {'$push': '$_id' }}}
    ]

    subjects = config.db.command('aggregate', 'sessions', pipeline=pipeline)
    for subject in subjects['result']:

        # Subjects without a code and sessions without a subject
        # will be returned grouped together, but all need unique IDs
        if subject['_id'].get('code') is None:
            for session_id in subject['sids']:
                subject_id = bson.ObjectId()
                config.db.sessions.update_one({'_id': session_id},{'$set': {'subject._id': subject_id}})
        else:
            subject_id = bson.ObjectId()
            query = {'_id': {'$in': subject['sids']}}
            update = {'$set': {'subject._id': subject_id}}
            config.db.sessions.update_many(query, update)

def upgrade_to_5():
    """
    scitran/core issue #279

    Ensure all sessions and acquisitions have the same perms as their project
    Bug(#278) discovered where changing a session's project did not update acquisition perms
    """

    projects = config.db.projects.find({})
    for p in projects:
        perms = p.get('permissions', [])

        session_ids = [s['_id'] for s in config.db.sessions.find({'project': p['_id']}, [])]

        config.db.sessions.update_many({'project': p['_id']}, {'$set': {'permissions': perms}})
        config.db.acquisitions.update_many({'session': {'$in': session_ids}}, {'$set': {'permissions': perms}})

def upgrade_to_6():
    """
    scitran/core issue #277

    Ensure all collection modified dates are ISO format
    Bug fixed in 6967f23
    """

    colls = config.db.collections.find({'modified': {'$type': 2}}) # type string
    for c in colls:
        fixed_mod = dateutil.parser.parse(c['modified'])
        config.db.collections.update_one({'_id': c['_id']}, {'$set': {'modified': fixed_mod}})

def upgrade_to_7():
    """
    scitran/core issue #270

    Add named inputs and specified destinations to jobs.

    Before:
    {
        "input" : {
            "container_type" : "acquisition",
            "container_id" : "572baf4e23dcb77ebbe06b3f",
            "filename" : "1_1_dicom.zip",
            "filehash" : "v0-sha384-422bd115d21585d1811d42cd99f1cf0a8511a4b377dd2deeaa1ab491d70932a051926ed99815a75142ad0815088ed009"
        }
    }

    After:
    {
        "inputs" : {
            "dicom" : {
                "container_type" : "acquisition",
                "container_id" : "572baf4e23dcb77ebbe06b3f",
                "filename" : "1_1_dicom.zip"
            }
        },
        "destination" : {
            "container_type" : "acquisition",
            "container_id" : "572baf4e23dcb77ebbe06b3f"
        }
    }
    """

    # The infrastructure runs this upgrade script before populating manifests.
    # For this reason, this one-time script does NOT pull manifests to do the input-name mapping, instead relying on a hard-coded alg name -> input name map.
    # If you have other gears in your system at the time of upgrade, you must add that mapping here.
    input_name_for_gear = {
        'dcm_convert': 'dicom',
        'qa-report-fmri': 'nifti',
        'dicom_mr_classifier': 'dicom',
    }

    jobs = config.db.jobs.find({'input': {'$exists': True}})

    for job in jobs:
        gear_name = job['algorithm_id']
        input_name = input_name_for_gear[gear_name]

        # Move single input to named input map
        input_ = job['input']
        input_.pop('filehash', None)
        inputs = { input_name: input_ }

        # Destination is required, and (for these jobs) is always the same container as the input
        destination = copy.deepcopy(input_)
        destination.pop('filename', None)

        config.db.jobs.update_one(
            {'_id': job['_id']},
            {
                '$set': {
                    'inputs': inputs,
                    'destination': destination
                },
                '$unset': {
                    'input': ''
                }
            }
        )

def upgrade_to_8():
    """
    scitran/core issue #291

    Migrate config, version, gears and rules to singletons collection
    """

    colls = config.db.collection_names()
    to_be_removed = ['version', 'config', 'static']
    # If we are in a bad state (singletons exists but so do any of the colls in to be removed)
    # remove singletons to try again
    if 'singletons' in colls and set(to_be_removed).intersection(set(colls)):
        config.db.drop_collection('singletons')

    if 'singletons' not in config.db.collection_names():
        static = config.db.static.find({})
        if static.count() > 0:
            config.db.singletons.insert_many(static)
        config.db.singletons.insert(config.db.version.find({}))

        configs = config.db.config.find({'latest': True},{'latest':0})
        if configs.count() == 1:
            c = configs[0]
            c['_id'] = 'config'
            config.db.singletons.insert_one(c)

        for c in to_be_removed:
            if c in config.db.collection_names():
                config.db.drop_collection(c)

def upgrade_to_9():
    """
    scitran/core issue #292

    Remove all session and acquisition timestamps that are empty strings
    """

    config.db.acquisitions.update_many({'timestamp':''}, {'$unset': {'timestamp': ''}})
    config.db.sessions.update_many({'timestamp':''}, {'$unset': {'timestamp': ''}})

def upgrade_to_10():
    """
    scitran/core issue #301

    Makes the following key renames, all in the jobs table.
    FR is a FileReference, CR is a ContainerReference:

    job.algorithm_id  --> job.name

    FR.container_type --> type
    FR.container_id   --> id
    FR.filename       --> name

    CR.container_type --> type
    CR.container_id   --> id
    """

    def switch_keys(doc, x, y):
        doc[y] = doc[x]
        doc.pop(x, None)


    jobs = config.db.jobs.find({'destination.container_type': {'$exists': True}})

    for job in jobs:
        switch_keys(job, 'algorithm_id', 'name')

        for key in job['inputs'].keys():
            inp = job['inputs'][key]

            switch_keys(inp, 'container_type', 'type')
            switch_keys(inp, 'container_id',   'id')
            switch_keys(inp, 'filename',       'name')


        dest = job['destination']
        switch_keys(dest, 'container_type', 'type')
        switch_keys(dest, 'container_id',   'id')

        config.db.jobs.update(
            {'_id': job['_id']},
            job
        )

def upgrade_to_11():
    """
    scitran/core issue #362

    Restructures job objects' `inputs` field from a dict with arbitrary keys
    into a list where the key becomes the field `input`
    """

    jobs = config.db.jobs.find({'inputs.type': {'$exists': False}})

    for job in jobs:

        inputs_arr = []
        for key, inp in job['inputs'].iteritems():
            inp['input'] = key
            inputs_arr.append(inp)

        config.db.jobs.update(
            {'_id': job['_id']},
            {'$set': {'inputs': inputs_arr}}
        )

def upgrade_to_12():
    """
    scitran/core PR #372

    Store job inputs on job-based analyses
    """

    sessions = config.db.sessions.find({'analyses.job': {'$exists': True}})

    for session in sessions:
        for analysis in session.get('analyses'):
            if analysis.get('job'):
                job = Job.get(analysis['job'])
                files = analysis.get('files', [])
                files[:] = [x for x in files if x.get('output')] # remove any existing inputs and insert fresh

                for i in getattr(job, 'inputs', {}):
                    fileref = job.inputs[i]
                    contref = containerutil.create_containerreference_from_filereference(job.inputs[i])
                    file_ = contref.find_file(fileref.name)
                    if file_:
                        file_['input'] = True
                        files.append(file_)

                q = {'analyses._id': analysis['_id']}
                u = {'$set': {'analyses.$.job': job.id_, 'analyses.$.files': files}}
                config.db.sessions.update_one(q, u)

def upgrade_to_13():
    """
    scitran/core PR #403

    Clear schema path from db config in order to set abs path to files
    """
    config.db.singletons.find_one_and_update(
        {'_id': 'config', 'persistent.schema_path': {'$exists': True}},
        {'$unset': {'persistent.schema_path': ''}})

def upgrade_to_14():
    """schema_path is no longer user configurable"""
    config.db.singletons.find_one_and_update(
        {'_id': 'config', 'persistent.schema_path': {'$exists': True}},
        {'$unset': {'persistent.schema_path': ''}})

def upgrade_to_15():
    """
    scitran/pull issue #417

    First remove all timestamps that are empty or not mongo date or string format.
    Then attempt to convert strings to dates, removing those that cannot be converted.
    Mongo $type maps: String = 2, Date = 9
    """
    query = {}
    query['$or'] = [
                    {'timestamp':''},
                    {'$and': [
                        {'timestamp': {'$exists': True}},
                        {'timestamp': {'$not': {'$type':2}}},
                        {'timestamp': {'$not': {'$type':9}}}
                    ]}
                ]
    unset = {'$unset': {'timestamp': ''}}

    config.db.sessions.update_many(query, unset)
    config.db.acquisitions.update_many(query, unset)

    query =  {'$and': [
                {'timestamp': {'$exists': True}},
                {'timestamp': {'$type':2}}
            ]}
    sessions = config.db.sessions.find(query)
    for s in sessions:
        try:
            fixed_timestamp = dateutil.parser.parse(s['timestamp'])
        except:
            config.db.sessions.update_one({'_id': s['_id']}, {'$unset': {'timestamp': ''}})
            continue
        config.db.sessions.update_one({'_id': s['_id']}, {'$set': {'timestamp': fixed_timestamp}})

    acquisitions = config.db.acquisitions.find(query)
    for a in acquisitions:
        try:
            fixed_timestamp = dateutil.parser.parse(a['timestamp'])
        except:
            config.db.sessions.update_one({'_id': a['_id']}, {'$unset': {'timestamp': ''}})
            continue
        config.db.sessions.update_one({'_id': a['_id']}, {'$set': {'timestamp': fixed_timestamp}})

def upgrade_to_16():
    """
    Fixes file.size sometimes being a floating-point rather than integer.
    """

    acquisitions = config.db.acquisitions.find({'files.size': {'$type': 'double'}})
    for x in acquisitions:
        for y in x.get('files', []):
            if y.get('size'):
                y['size'] = int(y['size'])
        config.db.acquisitions.update({"_id": x['_id']}, x)

    sessions = config.db.sessions.find({'files.size': {'$type': 'double'}})
    for x in sessions:
        for y in x.get('files', []):
            if y.get('size'):
                y['size'] = int(y['size'])
        config.db.sessions.update({"_id": x['_id']}, x)

    projects = config.db.projects.find({'files.size': {'$type': 'double'}})
    for x in projects:
        for y in x.get('files', []):
            if y.get('size'):
                y['size'] = int(y['size'])
        config.db.projects.update({"_id": x['_id']}, x)

    sessions = config.db.sessions.find({'analyses.files.size': {'$type': 'double'}})
    for x in sessions:
        for y in x.get('analyses', []):
            for z in y.get('files', []):
                if z.get('size'):
                    z['size'] = int(z['size'])
        config.db.sessions.update({"_id": x['_id']}, x)

def upgrade_to_17():
    """
    scitran/core issue #557

    Reassign subject ids after bug fix in packfile code that did not properly match subjects
    """

    pipeline = [
        {'$group' : { '_id' : {'pid': '$project', 'code': '$subject.code'}, 'sids': {'$push': '$_id' }}}
    ]

    subjects = config.db.command('aggregate', 'sessions', pipeline=pipeline)
    for subject in subjects['result']:

        # Subjects without a code and sessions without a subject
        # will be returned grouped together, but all need unique IDs
        if subject['_id'].get('code') is None:
            for session_id in subject['sids']:
                subject_id = bson.ObjectId()
                config.db.sessions.update_one({'_id': session_id},{'$set': {'subject._id': subject_id}})
        else:
            subject_id = bson.ObjectId()
            query = {'_id': {'$in': subject['sids']}}
            update = {'$set': {'subject._id': subject_id}}
            config.db.sessions.update_many(query, update)

def upgrade_to_18():
    """
    scitran/core issue #334

    Move singleton gear doc to its own table
    """

    gear_doc = config.db.singletons.find_one({"_id": "gears"})

    if gear_doc is not None:
        gear_list = gear_doc.get('gear_list', [])
        for gear in gear_list:
            try:
                gears.upsert_gear(gear)
            except Exception as e:
                logging.error("")
                logging.error("Error upgrading gear:")
                logging.error(type(e))
                logging.error("Gear will not be retained. Document follows:")
                logging.error(gear)
                logging.error("")

        config.db.singletons.remove({"_id": "gears"})

def upgrade_to_19():
    """
    scitran/core issue #552

    Add origin information to job object
    """

    update = {
        '$set': {
            'origin' : {'type': str(Origin.unknown), 'id': None}
        }
    }
    config.db.jobs.update_many({'origin': {'$exists': False}}, update)

def upgrade_to_20():
    """
    scitran/core issue #602

    Change dash to underscore for consistency
    """

    query = {'last-seen': {'$exists': True}}
    update = {'$rename': {'last-seen':'last_seen' }}

    config.db.devices.update_many(query, update)

def upgrade_to_21():
    """
    scitran/core issue #189 - Data Model v2

    Field `metadata` renamed to `info`
    Field `file.instrument` renamed to `file.modality`
    Acquisition fields `instrument` and `measurement` removed
    """

    def update_project_template(template):
        new_template = {'acquisitions': []}
        for a in template.get('acquisitions', []):
            new_a = {'minimum': a['minimum']}
            properties = a['schema']['properties']
            if 'measurement' in properties:
                m_req = properties['measurement']['pattern']
                m_req = re.sub('^\(\?i\)', '', m_req)
                new_a['files']=[{'measurement':  m_req, 'minimum': 1}]
            if 'label' in properties:
                l_req = properties['label']['pattern']
                l_req = re.sub('^\(\?i\)', '', l_req)
                new_a['label'] = l_req
            new_template['acquisitions'].append(new_a)

        return new_template

    def dm_v2_updates(cont_list, cont_name):
        for container in cont_list:

            query = {'_id': container['_id']}
            update = {'$rename': {'metadata': 'info'}}

            if cont_name == 'projects' and container.get('template'):
                new_template = update_project_template(json.loads(container.get('template')))
                update['$set'] = {'template': new_template}


            if cont_name == 'sessions':
                update['$rename'].update({'subject.metadata': 'subject.info'})


            measurement = None
            modality = None
            info = None
            if cont_name == 'acquisitions':
                update['$unset'] = {'instrument': '', 'measurement': ''}
                measurement = container.get('measurement', None)
                modality = container.get('instrument', None)
                info = container.get('metadata', None)
                if info:
                    config.db.acquisitions.update_one(query, {'$set': {'metadata': {}}})


            # From mongo docs: '$rename does not work if these fields are in array elements.'
            files = container.get('files')
            if files is not None:
                updated_files = []
                for file_ in files:
                    file_['info'] = {}
                    if 'metadata' in file_:
                        file_['info'] = file_.pop('metadata', None)
                    if 'instrument' in file_:
                        file_['modality'] = file_.pop('instrument', None)
                    if measurement:
                        # Move the acquisition's measurement to all files
                        if file_.get('measurements'):
                            file_['measurements'].append(measurement)
                        else:
                            file_['measurements'] = [measurement]
                    if info and file_.get('type', '') == 'dicom':
                        # This is going to be the dicom header info
                        updated_info = info
                        updated_info.update(file_['info'])
                        file_['info'] = updated_info
                    if modality and not file_.get('modality'):
                        file_['modality'] = modality

                    updated_files.append(file_)
                if update.get('$set'):
                    update['$set']['files'] =  updated_files
                else:
                    update['$set'] = {'files': updated_files}

            result = config.db[cont_name].update_one(query, update)

    query = {'$or':[{'files.metadata': { '$exists': True}},
                    {'metadata': { '$exists': True}},
                    {'files.instrument': { '$exists': True}}]}

    dm_v2_updates(config.db.collections.find(query), 'collections')

    query['$or'].append({'template': { '$exists': True}})
    dm_v2_updates(config.db.projects.find({}), 'projects')

    query['$or'].append({'subject': { '$exists': True}})
    dm_v2_updates(config.db.sessions.find(query), 'sessions')

    query['$or'].append({'instrument': { '$exists': True}})
    query['$or'].append({'measurement': { '$exists': True}})
    dm_v2_updates(config.db.acquisitions.find(query), 'acquisitions')

def upgrade_to_22():
    """
    Add created and modified timestamps to gear docs

    Of debatable value, since infra will load gears on each boot.
    """

    logging.info('Upgrade v22, phase 1 of 3, upgrading gears...')

    # Add timestamps to gears.
    for gear in config.db.gears.find({}):
        now = datetime.datetime.utcnow()

        gear['created']  = now
        gear['modified'] = now

        config.db.gears.update({'_id': gear['_id']}, gear)

        # Ensure there cannot possibly be two gears of the same name with the same timestamp.
        # Plus or minus monotonic time.
        # A very silly solution, but we only ever need to do this once, on a double-digit number of documents.
        # Not worth the effort to, eg, rewind time and do math.
        time.sleep(1)
        logging.info('  Updated gear ' + str(gear['_id']) + ' ...')
        sys.stdout.flush()


    logging.info('Upgrade v22, phase 2 of 3, upgrading jobs...')

    # Now that they're updated, fetch all gears and hold them in memory.
    # This prevents extra database queries during the job upgrade.

    all_gears = list(config.db.gears.find({}))
    gears_map = { }

    for gear in all_gears:
        gear_name = gear['gear']['name']

        gears_map[gear_name] = gear

    # A dummy gear for missing refs
    dummy_gear = {
        'category' : 'converter',
        'gear' : {
            'inputs' : {
                'do-not-use' : {
                    'base' : 'file'
                }
            },
            'maintainer' : 'Noone <nobody@example.example>',
            'description' : 'This gear or job was referenced before gear versioning. Version information is not available for this gear.',
            'license' : 'BSD-2-Clause',
            'author' : 'Noone',
            'url' : 'https://example.example',
            'label' : 'Deprecated Gear',
            'flywheel' : '0',
            'source' : 'https://example.example',
            'version' : '0.0.0',
            'custom' : {
                'flywheel': {
                    'invalid': True
                }
            },
            'config' : {},
            'name' : 'deprecated-gear'
        },
        'exchange' : {
            'git-commit' : '0000000000000000000000000000000000000000',
            'rootfs-hash' : 'sha384:000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000',
            'rootfs-url' : 'https://example.example/does-not-exist.tgz'
        }
    }

    maximum = config.db.jobs.count()
    upgraded = 0

    # Blanket-assume gears were the latest in the DB pre-gear versioning.
    for job in config.db.jobs.find({}):

        # Look up latest gear by name, lose job name key
        gear_name = job['name']
        gear = gears_map.get(gear_name)

        if gear is None:
            logging.info('Job doc ' + str(job['_id']) + ' could not find gear ' + gear_name + ', creating...')

            new_gear = copy.deepcopy(dummy_gear)
            new_gear['gear']['name'] = gear_name

            # Save new gear, store id in memory
            resp = config.db.gears.insert_one(new_gear)
            new_id = resp.inserted_id
            new_gear['_id'] = str(new_id)

            # Insert gear into memory map
            gears_map[gear_name] = new_gear

            logging.info('Created gear  ' + gear_name + ' with id ' + str(new_id) + '. Future jobs with this gear name with not alert.')

            gear = new_gear

        if gear is None:
            raise Exception("We don't understand python scopes ;( ;(")

        # Store gear ID
        job.pop('name', None)
        job['gear_id'] = str(gear['_id'])

        # Save
        config.db.jobs.update({'_id': job['_id']}, job)

        upgraded += 1
        if upgraded % 1000 == 0:
            logging.info('  Processed ' + str(upgraded) + ' jobs of ' + str(maximum) + '...')


    logging.info('Upgrade v22, phase 3 of 3, upgrading batch...')

    maximum = config.db.batch.count()
    upgraded = 0

    for batch in config.db.batch.find({}):

        # Look up latest gear by name, lose job name key
        gear = gears.get_gear_by_name(batch['gear'])
        batch.pop('gear', None)

        # Store gear ID
        batch['gear_id'] = str(gear['_id'])

        # Save
        config.db.batch.update({'_id': batch['_id']}, batch)

        upgraded += 1
        if upgraded % 1000 == 0:
            logging.info('  Processed ' + str(upgraded) + ' batch of ' + str(maximum) + '...')


    logging.info('Upgrade v22, complete.')

def upgrade_to_23():
    """
    scitran/core issue #650

    Support multiple auth providers
    Config 'auth' key becomes map where keys are auth_type
    """

    db_config = config.db.singletons.find_one({'_id': 'config'})
    if db_config:
        auth_config = db_config.get('auth', {})
        if auth_config.get('auth_type'):
            auth_type = auth_config.pop('auth_type')
            config.db.singletons.update_one({'_id': 'config'}, {'$set': {'auth': {auth_type: auth_config}}})

def upgrade_to_24():
    """
    scitran/core issue #720

    Migrate gear rules to the project level
    """

    global_rules = config.db.singletons.find_one({"_id" : "rules"})
    project_ids  = list(config.db.projects.find({},{"_id": "true"}))

    if global_rules is None:
        global_rules = {
            'rule_list': []
        }

    logging.info('Upgrade v23, migrating ' + str(len(global_rules['rule_list'])) + ' gear rules...')

    count = 0
    for old_rule in global_rules['rule_list']:

        logging.info(json.dumps(old_rule))

        gear_name = old_rule['alg']
        rule_name = 'Migrated rule ' + str(count)

        any_stanzas = []
        all_stanzas = []

        for old_any_stanza in old_rule.get('any', []):
            if len(old_any_stanza) != 2:
                raise Exception('Confusing any-rule stanza ' + str(count) + ': ' + json.dumps(old_any_stanza))

            any_stanzas.append({ 'type': old_any_stanza[0], 'value': old_any_stanza[1] })

        for old_all_stanza in old_rule.get('all', []):
            if len(old_all_stanza) != 2:
                raise Exception('Confusing all-rule stanza ' + str(count) + ': ' + json.dumps(old_all_stanza))

            all_stanzas.append({ 'type': old_all_stanza[0], 'value': old_all_stanza[1] })

        # New rule object
        new_rule = {
            'alg': gear_name,
            'name': rule_name,
            'any': any_stanzas,
            'all': all_stanzas
        }

        # Insert rule on every project
        for project in project_ids:
            project_id = project['_id']

            new_rule_obj = copy.deepcopy(new_rule)
            new_rule_obj['project_id'] = str(project_id)

            config.db.project_rules.insert_one(new_rule_obj)

        logging.info('Upgrade v23, migrated rule ' + str(count) + ' of ' + str(len(global_rules)) + '...')
        count += 1

    # Remove obsolete singleton
    config.db.singletons.remove({"_id" : "rules"})
    logging.info('Upgrade v23, complete.')

def upgrade_to_25():
    """
    scitran/core PR #733

    Migrate refresh token from authtokens to seperate collection
    """

    auth_tokens = config.db.authtokens.find({'refresh_token': {'$exists': True}})

    for a in auth_tokens:
        refresh_doc = {
            'uid': a['uid'],
            'token': a['refresh_token'],
            'auth_type': a['auth_type']
        }
        config.db.refreshtokens.insert(refresh_doc)

    config.db.authtokens.update_many({'refresh_token': {'$exists': True}}, {'$unset': {'refresh_token': ''}})

def upgrade_to_26_closure(job):

    gear = config.db.gears.find_one({'_id': bson.ObjectId(job['gear_id'])}, {'gear.name': 1})

    # This logic WILL NOT WORK in parallel mode
    if gear is None:
        logging.info('No gear found for job ' + str(job['_id']))
        return True
    if gear.get('gear', {}).get('name', None) is None:
        logging.info('No gear found for job ' + str(job['_id']))
        return True

    # This logic WILL NOT WORK in parallel mode

    gear_name = gear['gear']['name']

    # Checks if the specific gear tag already exists for the job
    if gear_name in job['tags']:
        return True

    result = config.db.jobs.update_one({'_id': job['_id']}, {'$addToSet': {'tags': gear_name }})

    if result.modified_count == 1:
        return True
    else:
        return 'Parallel failed: update doc ' + str(job['_id']) + ' resulted modified ' + str(result.modified_count)


def upgrade_to_26():
    """
    scitran/core #734

    Add job tags back to the job document, and use a faster cursor-walking update method
    """
    cursor = config.db.jobs.find({})
    process_cursor(cursor, upgrade_to_26_closure)


def upgrade_to_27():
    """
    scitran/core PR #768

    Fix project templates that reference `measurement` instead of `measurements`
    Update all session compliance for affected projects
    """

    projects = config.db.projects.find({'template.acquisitions.files.measurement': {'$exists': True}})

    storage = ProjectStorage()

    for p in projects:
        template = p.get('template', {})
        for a in template.get('acquisitions', []):
            for f in a.get('files', []):
                if f.get('measurement'):
                    f['measurements'] = f.pop('measurement')
        config.log.debug('the template is now {}'.format(template))
        config.db.projects.update_one({'_id': p['_id']}, {'$set': {'template': template}})
        storage.recalc_sessions_compliance(project_id=str(p['_id']))

def upgrade_to_28():
    """
    Fixes session.subject.age sometimes being a floating-point rather than integer.
    """

    sessions = config.db.sessions.find({'subject.age': {'$type': 'double'}})
    logging.info('Fixing {} subjects with age stored as double ...'.format(sessions.count()))
    for session in sessions:
        try:
            session['subject']['age'] = int(session['subject']['age'])
        except:
            session['subject']['age'] = None

        config.db.sessions.update({'_id': session['_id']}, session)




def upgrade_to_29_closure(user):

    avatars = user['avatars']
    if avatars.get('custom') and not 'https:' in avatars['custom']:
        if user['avatar'] == user['avatars']['custom']:
            if(user['avatars'].get('provider') == None):
                config.db.users.update_one({'_id': user['_id']},
                    {'$unset': {'avatar': ""}})
            else:
                config.db.users.update_one({'_id': user['_id']},
                    {'$set': {'avatar': user['avatars'].get('provider')}}
                )
        logging.info('Deleting custom ...')
        config.db.users.update_one({'_id': user['_id']},
            {'$unset': {"avatars.custom": ""}}
        )
    return True


def upgrade_to_29():
    """
    Enforces HTTPS urls for user avatars
    """

    users = config.db.users.find({})
    process_cursor(users, upgrade_to_29_closure)

def upgrade_to_30_closure_analysis(coll_item, coll):
    analyses = coll_item.get('analyses', [])

    for analysis_ in analyses:
        files = analysis_.get('files', [])
        for file_ in files:
            if 'created' not in file_:
                file_['created'] = analysis_.get('created', datetime.datetime(1970, 1, 1))
    result = config.db[coll].update_one({'_id': coll_item['_id']}, {'$set': {'analyses': analyses}})
    if result.modified_count == 1:
        return True
    else:
        return "File timestamp creation failed for:" + str(coll_item)

def upgrade_to_30_closure_coll(coll_item, coll):
    files = coll_item.get('files', [])
    for file_ in files:
        if 'created' not in file_:
            file_['created'] = coll_item.get('created', datetime.datetime(1970, 1, 1))
    result = config.db[coll].update_one({'_id': coll_item['_id']}, {'$set': {'files': files}})
    if result.modified_count == 1:
        return True
    else:
        return "File timestamp creation failed for:" + str(coll_item)


def upgrade_to_30():
    """
    scitran/core issue #759

    give created timestamps that are missing are given based on the parent object's timestamp
    """

    cursor = config.db.collections.find({'analyses.files.name': {'$exists': True},
                                         'analyses.files.created': {'$exists': False}})
    process_cursor(cursor, upgrade_to_30_closure_analysis, context = 'collections')

    cursor = config.db.sessions.find({'analyses.files.name': {'$exists': True},
                                      'analyses.files.created': {'$exists': False}})
    process_cursor(cursor, upgrade_to_30_closure_analysis, context = 'sessions')

    cursor = config.db.sessions.find({'files.name': {'$exists': True}, 'files.created': {'$exists': False}})
    process_cursor(cursor, upgrade_to_30_closure_coll, context = 'sessions')

    cursor = config.db.collections.find({'files.name': {'$exists': True}, 'files.created': {'$exists': False}})
    process_cursor(cursor, upgrade_to_30_closure_coll, context = 'collections')

    cursor = config.db.acquisitions.find({'files.name': {'$exists': True}, 'files.created': {'$exists': False}})
    process_cursor(cursor, upgrade_to_30_closure_coll, context = 'acquisitions')

    cursor = config.db.projects.find({'files.name': {'$exists': True}, 'files.created': {'$exists': False}})
    process_cursor(cursor, upgrade_to_30_closure_coll, context = 'projects')

def upgrade_to_31():
    config.db.sessions.update_many({'subject.firstname_hash': {'$exists': True}}, {'$unset': {'subject.firstname_hash':""}})
    config.db.sessions.update_many({'subject.lastname_hash': {'$exists': True}}, {'$unset': {'subject.lastname_hash':""}})

def upgrade_to_32_closure(coll_item, coll):
    permissions = coll_item.get('permissions', [])
    for permission_ in permissions:
        if permission_.get('site', False):
            del permission_['site']
    result = config.db[coll].update_one({'_id': coll_item['_id']}, {'$set': {'permissions' : permissions}})
    if result.modified_count == 0:
        return "Failed to remove site field"
    return True

def upgrade_to_32():
    for coll in ['acquisitions', 'groups', 'projects', 'sessions']:
        cursor = config.db[coll].find({'permissions.site': {'$exists': True}})
        process_cursor(cursor, upgrade_to_32_closure, context = coll)
    config.db.sites.drop()

def upgrade_to_33_closure(cont, cont_name):
    cont_type = cont_name[:-1]
    if cont.get('analyses'):
        for analysis in cont['analyses']:
            analysis['_id'] = bson.ObjectId(analysis['_id'])
            analysis['parent'] = {'type': cont_type, 'id': cont['_id']}
            analysis['permissions'] = cont['permissions']
            for key in ('public', 'archived'):
                if key in cont:
                    analysis[key] = cont[key]
        config.db['analyses'].insert_many(cont['analyses'])
    config.db[cont_name].update_one(
        {'_id': cont['_id']},
        {'$unset': {'analyses': ''}})
    return True

def upgrade_to_33():
    """
    scitran/core issue #808 - make analyses use their own collection
    """
    for cont_name in ['projects', 'sessions', 'acquisitions', 'collections']:
        cursor = config.db[cont_name].find({'analyses': {'$exists': True}})
        process_cursor(cursor, upgrade_to_33_closure, context=cont_name)

def upgrade_to_34():
    """
    Changes group.roles -> groups.permissions

    scitran/core #662
    """
    config.db.groups.update_many({'roles': {'$exists': True}}, {'$rename': {'roles': 'permissions'}})
    config.db.groups.update_many({'name': {'$exists': True}}, {'$rename': {'name': 'label'}})

def upgrade_to_35_closure(batch_job):
    if batch_job.get('state') in ['cancelled', 'running', 'complete', 'failed']:
        return True
    batch_id = batch_job.get('_id')
    config.db.jobs.update_many({'_id': {'$in': batch_job.get('jobs',[])}}, {'$set': {'batch':batch_id}})
    new_state = batch.check_state(batch_id)
    if new_state:
        result = config.db.batch.update_one({'_id': batch_id}, {'$set': {"state": new_state}})
        if result.modified_count != 1:
            raise Exception('Batch job not updated')
    else:
        result = config.db.batch.update_one({'_id': batch_id}, {'$set': {"state": "running"}})
        if result.modified_count != 1:
            raise Exception('Batch job not updated')
    return True

def upgrade_to_35():
    """
    scitran/core issue #710 - give batch stable states
    """
    cursor = config.db.batch.find({})
    process_cursor(cursor, upgrade_to_35_closure)


def upgrade_to_36_closure(acquisition):

    for f in acquisition['files']:
        if not f.get('mimetype'):
            logging.debug('file with name {} did not have mimetype'.format(f['name']))
            f['mimetype'] = util.guess_mimetype(f['name'])

    result = config.db.acquisitions.update_one({'_id': acquisition['_id']}, {'$set': {'files': acquisition['files']}})
    if result.modified_count != 1:
        raise Exception('Acquisition file not updated')

    return True


def upgrade_to_36():
    """
    scitran/core issue #931 - mimetype not set on packfile uploads
    """
    cursor = config.db.acquisitions.find({'files': { '$gt': [] }, 'files.mimetype': None})
    process_cursor(cursor, upgrade_to_36_closure)

def upgrade_to_37():
    """
    scitran/core issue #916 - group-permission level site info needs to be removed from all levels
    """
    for coll in ['acquisitions', 'groups', 'projects', 'sessions', 'analyses']:
        cursor = config.db[coll].find({'permissions.site': {'$exists': True}})
        process_cursor(cursor, upgrade_to_32_closure, context = coll)


def upgrade_to_38_closure(user):

    # if user has existing API key in correct db location, remove API key stored on user and move on
    # otherwise, migrate api key to new location

    api_key = user['api_key']
    doc = config.db.apikeys.find_one({'uid': user['_id'], 'type': 'user'})

    if not doc:

        # migrate existing API key

        new_api_key_doc = {
            '_id': api_key['key'],
            'created': api_key['created'],
            'last_used': api_key['last_used'],
            'uid': user['_id'],
            'type': 'user'
        }

        config.db.apikeys.insert(new_api_key_doc)

    config.db.users.update_one({'_id': user['_id']}, {'$unset': {'api_key': 0}})

    return True


def upgrade_to_38():
    """
    Move existing user api keys to new 'apikeys' collection
    """
    cursor = config.db.users.find({'api_key': {'$exists': True }})
    process_cursor(cursor, upgrade_to_38_closure)


def upgrade_to_39_closure(job):
    """
    Done in python because:
    " the source and target field for $rename must not be on the same path "
    """

    config_ = job.pop('config', {})
    config.db.jobs.update({'_id': job['_id']}, {'$set': {'config': {'config': config_}}})

    return True

def upgrade_to_39():
    """
    Move old jobs without extra config down one level to match new jobs
    with additional keys.

    Before:
    {
        'config': {
            'a': 'b'
        }
    }

    After:
    {
        'config': {
            'config': {
                'a': 'b'
            }
        }
    }
    """
    cursor = config.db.jobs.find({'config': {'$exists': True }, 'config.config': {'$exists': False }})
    process_cursor(cursor, upgrade_to_39_closure)


def upgrade_to_40_closure(acquisition):
    config.db.acquisitions.update_one({'_id':acquisition['_id']},{'$set':{'timestamp':dateutil.parser.parse(acquisition['timestamp'])}})
    return True

def upgrade_to_40():
    """
    Convert all string acquisition timestamps to type date
    """
    cursor = config.db.acquisitions.find({'timestamp':{'$type':'string'}})
    process_cursor(cursor, upgrade_to_40_closure)


def upgrade_to_41_closure(cont, cont_name):

    files = cont.get('files', [])
    for f in files:
        if 'tags' not in f:
            f['tags'] = []
        if 'measurements' not in f:
            f['measurements'] = []
        if 'origin' not in f:
            f['origin'] = {
                'type': str(Origin.unknown),
                'id': None
            }
        if 'mimetype' not in f:
            f['mimetype'] = util.guess_mimetype(f.get('name'))
        if 'modality' not in f:
            f['modality'] = None
    config.db[cont_name].update_one({'_id': cont['_id']}, {'$set': {'files': files}})
    return True


def upgrade_to_41():
    """
    scitran/core issue #1042 - some "expected" file default keys are not present

    These are the fields that are created on every file object in upload.py
    """

    for cont_name in ['groups', 'projects', 'sessions', 'acquisitions', 'collections', 'analyses']:
        cursor = config.db[cont_name].find({'files': { '$elemMatch': { '$or': [
            {'tags':          {'$exists': False }},
            {'measurements':  {'$exists': False }},
            {'origin':        {'$exists': False }},
            {'mimetype':      {'$exists': False }},
            {'modality':      {'$exists': False }}
        ]}}})
        process_cursor(cursor, upgrade_to_41_closure, context=cont_name)


def upgrade_to_42_closure(cont, cont_name):
    archived = cont.pop('archived')
    update = {'$unset': {'archived': True}}
    if archived:
        cont['tags'] = cont.get('tags', []) + ['hidden']
        update['$set'] = {'tags': cont['tags']}
    config.db[cont_name].update_one({'_id': cont['_id']}, update)
    return True

def upgrade_to_42():
    """
    Change container flag "archived" to container tag "hidden"
    """
    for cont_name in ['groups', 'projects', 'sessions', 'acquisitions']:
        cursor = config.db[cont_name].find({'archived': {'$exists': True}})
        process_cursor(cursor, upgrade_to_42_closure, context=cont_name)


def upgrade_to_43_closure(analysis):
    inputs = [f for f in analysis['files'] if f.get('input')]
    outputs = [f for f in analysis['files'] if f.get('output')]
    for f in inputs + outputs:
        f.pop('input', None)
        f.pop('output', None)
    config.db.analyses.update_one({'_id': analysis['_id']}, {'$set': {'inputs': inputs, 'files': outputs}})
    return True

def upgrade_to_43():
    """
    Remove analysis files' input/output tags and store them separately instead:
       - inputs under `analysis.inputs`
       - outputs under `analysis.files`
    """
    cursor = config.db.analyses.find({'files': {'$exists': True, '$ne': []}})
    process_cursor(cursor, upgrade_to_43_closure)


def upgrade_to_44():
    """
    A rerun of scitran/core issue #263

    A rerun was necessary because a bug was found when moving a session to a new project:
    the subject id should change but it was not, causing subject linking where there should be none

    Add '_id' field to session.subject
    Give subjects with the same code and project the same _id
    """

    pipeline = [
        {'$group' : { '_id' : {'pid': '$project', 'code': '$subject.code'}, 'sids': {'$push': '$_id' }}}
    ]

    subjects = config.db.command('aggregate', 'sessions', pipeline=pipeline)
    for subject in subjects['result']:

        # Subjects without a code and sessions without a subject
        # will be returned grouped together, but all need unique IDs
        if subject['_id'].get('code') is None:
            for session_id in subject['sids']:
                subject_id = bson.ObjectId()
                config.db.sessions.update_one({'_id': session_id},{'$set': {'subject._id': subject_id}})
        else:
            subject_id = bson.ObjectId()
            query = {'_id': {'$in': subject['sids']}}
            update = {'$set': {'subject._id': subject_id}}
            config.db.sessions.update_many(query, update)


###
### BEGIN RESERVED UPGRADE SECTION
###

# Due to performance concerns with database upgrades, some upgrade implementations might be postposed.
# The team contract is that if you write an upgrade touch one of the tables mentioned below, you MUST also implement any reserved upgrades.
# This way, we can bundle changes together that need large cursor iterations and save multi-hour upgrade times.



###
### END RESERVED UPGRADE SECTION
###


def upgrade_schema(force_from = None):
    """
    Upgrades db to the current schema version

    Returns (0) if upgrade is successful
    """

    db_version = get_db_version()

    if force_from:
        if isinstance(db_version,int) and db_version >= force_from:
            db_version = force_from
        else:
            logging.error('Cannot force from future version %s. Database only at version %s', str(force_from), str(db_version))
            sys.exit(43)


    if not isinstance(db_version, int) or db_version > CURRENT_DATABASE_VERSION:
        logging.error('The stored db schema version of %s is incompatible with required version %s',
                       str(db_version), CURRENT_DATABASE_VERSION)
        sys.exit(43)
    elif db_version == CURRENT_DATABASE_VERSION:
        logging.error('Database already up to date.')
        sys.exit(43)

    try:
        while db_version < CURRENT_DATABASE_VERSION:
            db_version += 1
            upgrade_script = 'upgrade_to_'+str(db_version)
            logging.info('Upgrading to version {} ...'.format(db_version))
            globals()[upgrade_script]()
            logging.info('Upgrade to version {} complete.'.format(db_version))
    except KeyError as e:
        logging.exception('Attempted to upgrade using script that does not exist: {}'.format(e))
        sys.exit(1)
    except Exception as e:
        logging.exception('Incremental upgrade of db failed')
        sys.exit(1)
    else:
        config.db.singletons.update_one({'_id': 'version'}, {'$set': {'database': CURRENT_DATABASE_VERSION}})
        sys.exit(0)

if __name__ == '__main__':
    try:
        parser = argparse.ArgumentParser()
        parser.add_argument("function", help="function to be called from database.py")
        parser.add_argument("-f", "--force_from", help="force database to upgrade from previous version", type=int)
        args = parser.parse_args()

        if args.function == 'confirm_schema_match':
            confirm_schema_match()
        elif args.function == 'upgrade_schema':
            if args.force_from:
                upgrade_schema(args.force_from)
            else:
                upgrade_schema()
        else:
            logging.error('Unknown method name given as argv to database.py')
            sys.exit(1)
    except Exception as e:
        logging.exception('Unexpected error in database.py')
        sys.exit(1)