
View on GitHub


1 day
Test Coverage
#!/usr/bin/env python
'''Utility functions to read the tasks.db from ipyparallel when used
with BluePyOpt.

The script is executable, as an example on how to extract the top 10
longest running engines for the l5pc model.

import sqlite3

import pandas
import numpy as np

from ipyparallel.controller import sqlitedb

def _open_db(dbfile):
    '''open the sqlite file, and set up the converters to serialize data'''
    sqlite3.register_adapter(dict, sqlitedb._adapt_dict)
    sqlite3.register_converter('dict', sqlitedb._convert_dict)
    sqlite3.register_adapter(list, sqlitedb._adapt_bufs)
    sqlite3.register_converter('bufs', sqlitedb._convert_bufs)
    db = sqlite3.connect(dbfile,
    return db

def _add_buffers(df, arg_names=None, result_names=None, delete_buffers=False):
    '''add argument and result buffers to the dataframe'''
    if arg_names is None:
        row = df['buffers'][0]
        count = len(sqlitedb._convert_bufs(row[3])[0])
        arg_names = ['arg_%02d' % i for i in range(count)]

    args = np.empty(shape=(df.shape[0], len(arg_names)))
    for i, row in enumerate(df['buffers']):
        # row[0] is pickled ref to,
        # row[1] is pickled description of args:
        #  {'kw_keys': [], 'nargs': 2, 'narg_bufs': 2}
        # row[2] is pickled function to be mapped
        # row[3] are the pickled arguments, packed into a list
        args[i, :] = sqlitedb._convert_bufs(row[3])[0]
    args_df = pandas.DataFrame(args, columns=arg_names)

    if result_names is None:
        row = df['result_buffers'][0]
        count = len(sqlitedb._convert_bufs(row[0])[0])
        result_names = ['res_%02d' % i for i in range(count)]

    results = np.empty(shape=(df.shape[0], len(result_names)))
    for i, row in enumerate(df['result_buffers']):
        if row:
            # row[0] are the pickled results, in a list
            results[i, :] = sqlitedb._convert_bufs(row[0])[0]
            # either the task didn't return, or returned an error
            results[i, :] = float('NaN')
    results_df = pandas.DataFrame(results, columns=result_names)

    if delete_buffers:
        df.drop(['buffers', 'result_buffers'], axis=1, inplace=True)

    return pandas.concat(
        [df, args_df, results_df],
        axis=1, join_axes=[df.index])

def create_df(db, arg_names=None, result_names=None):
    '''convert a tasks_db to a pandas dataframe'''
    df = pandas.read_sql('select * from `ipython-tasks`;', db,
                         parse_dates=('submitted', 'started', 'completed',))
    df = _add_buffers(df, arg_names, result_names)
    return df

if __name__ == '__main__':
    import sys

    from l5pc_model import define_parameters
    arg_names = [ for p in define_parameters() if not p.frozen]

    from l5pc_evaluator import (define_protocols, define_fitness_calculator)
    fitness_protocols = define_protocols()
    fitness_calculator = define_fitness_calculator(fitness_protocols)
    result_names = [ for o in fitness_calculator.objectives]

    db_filename = sys.argv[1]
    db = _open_db(db_filename)
    df = create_df(db, arg_names, result_names)
    df['run_time'] = df['completed'] - df['started']

    pandas.set_option('display.expand_frame_repr', False)
    title = '10 Longest times'
    print('%s\n%s' % (title, '*' * len(title)))
        df.nlargest(10, 'run_time')
        [['run_time', ] + arg_names + result_names])