rokroskar/sparkhpc

View on GitHub
scripts/sparkcluster

Summary

Maintainability
Test Coverage
#!/bin/env python
#
#
# CLI for starting and running Spark standalone clusters on HPC resources
#
#
 
from __future__ import print_function
import click
import sparkhpc
from sparkhpc import sparkjob
import subprocess
import os
import logging
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('sparkhpc')
 
home = os.path.expanduser('~')
 
SCHEDULER = sparkjob.get_scheduler()
 
@click.group()
def cli():
pass
 
@cli.command()
@click.argument('ncores', type=int)
@click.option('--walltime', default="00:30", help="Walltime in HH:MM format")
@click.option('--jobname', default='sparkcluster', help='Name to use for the job')
@click.option('--template', default=None, help='Job template path')
@click.option('--memory-per-executor', default=2000, envvar='SPARK_EXECUTOR_MEMORY',
help='Memory to reserve for each executor (i.e. the JVM) in MB')
@click.option('--memory-per-core', default=2000,
help='Memory per core to request from scheduler in MB')
@click.option('--cores-per-executor', default=1,
help='Cores per executor')
@click.option('--spark-home', default=os.path.join(home,'spark'), envvar='SPARK_HOME',
help='Location of the Spark distribution')
@click.option('--wait', default=False, is_flag=True, help='Wait until the job starts')
def start(ncores,
walltime,
jobname,
template,
memory_per_executor,
memory_per_core,
cores_per_executor,
spark_home,
wait):
"""Start the spark cluster as a batch job"""
sj = sparkjob.sparkjob(ncores=ncores,
walltime=walltime,
jobname=jobname,
template=template,
memory_per_core=memory_per_core,
memory_per_executor=memory_per_executor,
cores_per_executor=cores_per_executor,
spark_home=spark_home)
if wait:
logger.info(' Waiting for job to start - ctrl-c to stop')
sj.wait_to_start()
else:
sj.submit()
 
@cli.command()
def info():
"""Get info about currently running clusters"""
sparkhpc.show_clusters()
 
 
@cli.command()
@click.argument('clusterid')
def stop(clusterid):
"""Kill a currently running cluster ('all' to kill all clusters)"""
sjs = sparkjob.sparkjob().current_clusters()
 
if clusterid == 'all':
if len(sjs) == 0:
logger.info(' No clusters running')
for sj in sjs:
sj.stop()
elif int(clusterid) < len(sjs):
sjs[int(clusterid)].stop()
else:
raise RuntimeError('Cluster %s does not exist'%clusterid)
 
 
@cli.command()
@click.option('--memory', default='2000M', help='Memory for each executor using a Java memory string')
@click.option('--timeout', default=30, help='Timeout for starting spark master')
@click.option('--cores-per-executor', default=1, help='Number of cores per executor')
def launch(memory, timeout, cores_per_executor):
"""Launch the Spark master and workers within a current job context"""
sparkjob.start_cluster(memory, timeout=timeout, cores_per_executor=cores_per_executor)
 
if __name__ == "__main__":
cli()