rokroskar/sparkhpc

View on GitHub
scripts/hpcnotebook

Summary

Maintainability
Test Coverage
#!/usr/bin/env python
from __future__ import print_function
import os
from os.path import expanduser, exists
import subprocess
import sys
import re
import socket
from IPython.terminal.ipapp import launch_new_instance
import shutil
import click
import pkg_resources
 
import logging
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('hpcnotebook')
 
# try to figure out which scheduler we have
def which(program):
import os
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
 
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
 
return None
 
if which('bjobs') is not None:
SCHEDULER = 'LSF'
HOST_ENVIRON = 'LSB_HOSTS'
elif which('squeue') is not None:
SCHEDULER = 'SLURM'
HOST_ENVIRON = 'SLURM_NODELIST'
else:
# no scheduler
HOST_ENVIRON = 'NO_SCHEDULER_HERE'
 
#
# Initial inspiration for this script from https://github.com/felixcheung/vagrant-projects
#
 
notebook_config_template = """c = get_config()
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.password = "{password}"
c.NotebookApp.certfile = "{certfile}"
c.NotebookApp.port = {port}
"""
 
templates = {'LSF': 'hpcnotebook.lsf.template'}
submit_command = {'LSF': 'bsub < %s'}
job_regex = {'LSF': 'Job <(\d+)>'}
 
class bc:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
 
# get home directory
home = expanduser("~")
 
# setup the path to the jupyter notebook configuration
jupyter_config_path = "{home}/.jupyter_notebook".format(home=home)
 
@click.group()
@click.option('--port', default=8889, help='Port for the notebook server')
@click.pass_context
def cli(ctx, port):
ctx.obj['port'] = port
 
@cli.command()
@click.option('--force', is_flag=True, help='Force a reset of the notebook, even if it has already been done')
@click.pass_context
def setup(ctx, force):
"""Setup the notebook"""
 
port = ctx.obj['port']
 
if force and exists(jupyter_config_path):
shutil.rmtree(jupyter_config_path)
 
# if the profile doesn't exist, create it -- otherwise we've probably
# already done this step
if not exists(jupyter_config_path):
# get a password
from notebook.auth import passwd
logger.info("""
This script will create a Jupyter notebook profile for working remotely.
When it is finished, you can find the configuration in {conf_path}.
First, please enter a *new* password for your Jupyter notebook:
""".format(conf_path=bc.UNDERLINE + jupyter_config_path + bc.ENDC))
new_pass = passwd()
 
logger.info("Creating an SSL certificate to enable a secure connection; the certificate will be in your ~/.ssh directory")
# make sure the .ssh directory is there before continuing
sshdir = '{home}/.ssh'.format(home=home)
if not os.path.exists(sshdir):
os.makedirs(sshdir)
os.fchmod(sshdir, 700)
 
# make an ssl certificate
certfile = "{home}/.ssh/notebook_cert.pem".format(home=home)
 
out = subprocess.check_output(['openssl', 'req', '-x509', '-nodes', '-days', '365', '-newkey', 'rsa:1024', '-keyout', '%s'%certfile, '-out', '%s'%certfile, '-batch'], stderr=subprocess.STDOUT).decode()
 
lines = out.split('\n')
for l in lines :
logger.info(bc.OKGREEN + '[openssl] ' + bc.ENDC + l)
# write the notebook config
 
# create the directory
os.makedirs(jupyter_config_path)
with open("{profile_path}/jupyter_notebook_config.py".format(profile_path=jupyter_config_path), 'w') as f:
f.write(notebook_config_template.format(
password=new_pass, certfile=certfile, port=port))
logger.info(bc.BOLD + 'Notebook setup complete' + bc.ENDC)
else:
logger.error(bc.FAIL + "The jupyter notebook already looks set up; if you want to force setup, use --force".format(dir=jupyter_config_path) + bc.ENDC)
 
@cli.command()
@click.pass_context
def launch(ctx):
"""Launch the notebook"""
port = ctx.obj['port']
 
argv = sys.argv[:1]
argv.append('notebook')
 
argv.append('--config={profile_path}/jupyter_notebook_config.py'.format(profile_path=jupyter_config_path))
 
# check if we passed in a port that is different from the one in the
# configuration
with open("{profile_path}/jupyter_notebook_config.py".format(profile_path=jupyter_config_path), 'r') as conf:
conf_port = int(re.findall('port = (\d+)', conf.read())[0])
 
if conf_port != port:
logger.warning(bc.WARNING + "Overriding the port found in the existing configuration" + bc.ENDC)
argv.append('--port={port}'.format(port=port))
 
# determine if we're running on a compute node
if HOST_ENVIRON in os.environ:
compute = True
else:
compute = False
 
sys.argv = argv
 
if compute:
ip = socket.gethostbyname(socket.gethostname())
else:
ip = 'localhost'
 
logger.info(bc.BOLD + "To access the notebook, inspect the output below for the port number, then point your browser to https://{ip}:<port_number>".format(ip=ip) + bc.ENDC)
sys.stdout.flush()
launch_new_instance()
 
@cli.command()
@click.option('--ncores', type=int, default=1, help='Requested number of cores')
@click.option('--walltime', default='01:00', help='Requested runtime in HH:MM')
@click.option('--memory', type=int, default=2000, help='Requested memory in MB')
@click.option('--template', default=None, help='Path to template to use for job submission')
@click.option('--jobname', default='hpcnotebook', help='Name for the batch job')
@click.pass_context
def submit(ctx, ncores, walltime, memory, template, jobname):
"""Submit a notebook job to the scheduler"""
if SCHEDULER != 'LSF':
raise RuntimeError('only the LSF scheduler is supported at the moment')
 
if template is None:
template_file = templates[scheduler]
template_str = pkg_resources.resource_string('sparkhpc', 'templates/%s'%template_file)
else :
with open(self.template, 'r') as template_file:
template_str = template_file.read()
job = template_str.format(walltime=walltime,
ncores=ncores,
memory=memory,
jobname=jobname)
 
with open('notebook_job', 'w') as jobfile:
jobfile.write(job)
 
job_submit = subprocess.Popen(submit_command[scheduler]%'notebook_job', shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
try:
jobid = re.findall(job_regex[scheduler], job_submit.stdout.read().decode())[0]
except Exception as e:
logger.error('Job submission failed or jobid invalid')
raise e
return jobid
 
if __name__ == "__main__":
cli(obj={})