import bson
import copy
import datetime
from .. import config
from ..dao.containerstorage import AcquisitionStorage, AnalysisStorage
from .jobs import Job
from .queue import Queue
from ..web.errors import APINotFoundException, APIStorageException
from . import gears
log = config.log
# To <------- #From
'failed': 'running',
'complete': 'running',
'running': 'pending',
'cancelled': 'running'
def get_all(query, projection=None):
Fetch batch objects from the database
return config.db.batch.find(query, projection)
def get(batch_id, projection=None, get_jobs=False):
Fetch batch object by id, include stats and job objects as requested
if isinstance(batch_id, str):
batch_id = bson.ObjectId(batch_id)
batch_job = config.db.batch.find_one({'_id': batch_id}, projection)
if batch_job is None:
raise APINotFoundException('Batch job {} not found.'.format(batch_id))
if get_jobs:
jobs = []
for jid in batch_job.get('jobs', []):
job = Job.get(jid)
batch_job['jobs'] = jobs
return batch_job
def find_matching_conts(gear, containers, container_type):
Give a gear and a list of containers, find files that:
- have no solution to the gear's input schema (not matched)
- have multiple solutions to the gear's input schema (ambiguous)
- match the gear's input schema 1 to 1 (matched)
Containers are placed in one of the three categories in order.
A container with 2 possible files for one input and none for the other
will be marked as 'not matched', not ambiguous.
matched_conts = []
not_matched_conts = []
ambiguous_conts = []
for c in containers:
files = c.get('files')
if files:
suggestions = gears.suggest_for_files(gear, files)
# Determine if any of the inputs are ambiguous or not satisfied
ambiguous = False # Are any of the inputs ambiguous?
not_matched = False
for files in suggestions.itervalues():
if len(files) > 1:
ambiguous = True
elif len(files) == 0:
not_matched = True
# Based on results, add to proper list
if not_matched:
elif ambiguous:
# Create input map of file refs
inputs = {}
for input_name, files in suggestions.iteritems():
inputs[input_name] = {'type': container_type, 'id': str(c['_id']), 'name': files[0]}
c['inputs'] = inputs
return {
'matched': matched_conts,
'not_matched': not_matched_conts,
'ambiguous': ambiguous_conts
def insert(batch_proposal):
Simple database insert given a batch proposal.
time_now = datetime.datetime.utcnow()
batch_proposal['created'] = time_now
batch_proposal['modified'] = time_now
return config.db.batch.insert(batch_proposal)
def update(batch_id, payload):
Updates a batch job, being mindful of state flow.
time_now = datetime.datetime.utcnow()
bid = bson.ObjectId(batch_id)
query = {'_id': bid}
payload['modified'] = time_now
if payload.get('state'):
# Require that the batch job has the previous state
query['state'] = BATCH_JOB_TRANSITIONS[payload.get('state')]
result = config.db.batch.update_one({'_id': bid}, {'$set': payload})
if result.modified_count != 1:
raise Exception('Batch job not updated')
def run(batch_job):
Creates jobs from proposed inputs, returns jobs enqueued.
proposal = batch_job.get('proposal')
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
proposed_inputs = proposal.get('inputs', [])
proposed_destinations = proposal.get('destinations', [])
gear_id = batch_job['gear_id']
gear = gears.get_gear(gear_id)
gear_name = gear['gear']['name']
config_ = batch_job.get('config')
origin = batch_job.get('origin')
tags = proposal.get('tags', [])
if gear.get('category') == 'analysis':
analysis_base = proposal.get('analysis', {})
if not analysis_base.get('label'):
time_now = datetime.datetime.utcnow()
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
an_storage = AnalysisStorage()
acq_storage = AcquisitionStorage()
jobs = []
job_ids = []
job_defaults = {
'config': config_,
'gear_id': gear_id,
'tags': tags,
'batch': str(batch_job.get('_id')),
'inputs': {}
for inputs in proposed_inputs:
job_map = copy.deepcopy(job_defaults)
job_map['inputs'] = inputs
if gear.get('category') == 'analysis':
analysis = copy.deepcopy(analysis_base)
# Create analysis
acquisition_id = inputs.values()[0].get('id')
session_id = acq_storage.get_container(acquisition_id, projection={'session': 1}).get('session')
analysis['job'] = job_map
result = an_storage.create_el(analysis, 'sessions', session_id, origin, None)
analysis = an_storage.get_el(result.inserted_id)
job = analysis.get('job')
job_id = bson.ObjectId(job.id_)
job = Queue.enqueue_job(job_map, origin)
job_id = job.id_
for dest in proposed_destinations:
job_map = copy.deepcopy(job_defaults)
job_map['destination'] = dest
if gear.get('category') == 'analysis':
analysis = copy.deepcopy(analysis_base)
# Create analysis
analysis['job'] = job_map
result = an_storage.create_el(analysis, 'sessions', bson.ObjectId(dest['id']), origin, None)
analysis = an_storage.get_el(result.inserted_id)
job = analysis.get('job')
job_id = bson.ObjectId(job.id_)
job = Queue.enqueue_job(job_map, origin)
job_id = job.id_
update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
return jobs
def cancel(batch_job):
Cancels all pending jobs, returns number of jobs cancelled.
pending_jobs = config.db.jobs.find({'state': 'pending', '_id': {'$in': batch_job.get('jobs')}})
cancelled_jobs = 0
for j in pending_jobs:
job = Job.load(j)
Queue.mutate(job, {'state': 'cancelled'})
cancelled_jobs += 1
except Exception: # pylint: disable=broad-except
# if the cancellation fails, move on to next job
update(batch_job['_id'], {'state': 'cancelled'})
return cancelled_jobs
def check_state(batch_id):
Returns state of batch based on state of each of its jobs
are complete or failed
batch = get(str(batch_id))
if batch.get('state') == 'cancelled':
return None
batch_jobs = config.db.jobs.find({'_id':{'$in': batch.get('jobs', [])}, 'state': {'$nin': ['complete', 'failed', 'cancelled']}})
non_failed_batch_jobs = config.db.jobs.find({'_id':{'$in': batch.get('jobs', [])}, 'state': {'$ne': 'failed'}})
if batch_jobs.count() == 0:
if non_failed_batch_jobs.count() > 0:
return 'complete'
return 'failed'
return None
def get_stats():
Return the number of jobs by state.
raise NotImplementedError()
def resume():
Move cancelled jobs back to pending.
raise NotImplementedError()
def delete():
- the batch job
- it's spawned jobs
- all the files it's jobs produced.
raise NotImplementedError()