scripts/sparkcluster
#!/bin/env python### CLI for starting and running Spark standalone clusters on HPC resources## from __future__ import print_functionimport clickimport sparkhpcfrom sparkhpc import sparkjobimport subprocessimport osimport logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger('sparkhpc') home = os.path.expanduser('~') SCHEDULER = sparkjob.get_scheduler() @click.group()def cli(): pass @cli.command()@click.argument('ncores', type=int)@click.option('--walltime', default="00:30", help="Walltime in HH:MM format")@click.option('--jobname', default='sparkcluster', help='Name to use for the job')@click.option('--template', default=None, help='Job template path')@click.option('--memory-per-executor', default=2000, envvar='SPARK_EXECUTOR_MEMORY', help='Memory to reserve for each executor (i.e. the JVM) in MB')@click.option('--memory-per-core', default=2000, help='Memory per core to request from scheduler in MB')@click.option('--cores-per-executor', default=1, help='Cores per executor')@click.option('--spark-home', default=os.path.join(home,'spark'), envvar='SPARK_HOME', help='Location of the Spark distribution')@click.option('--wait', default=False, is_flag=True, help='Wait until the job starts')def start(ncores, walltime, jobname, template, memory_per_executor, memory_per_core, cores_per_executor, spark_home, wait): """Start the spark cluster as a batch job""" sj = sparkjob.sparkjob(ncores=ncores, walltime=walltime, jobname=jobname, template=template, memory_per_core=memory_per_core, memory_per_executor=memory_per_executor, cores_per_executor=cores_per_executor, spark_home=spark_home) if wait: logger.info(' Waiting for job to start - ctrl-c to stop') sj.wait_to_start() else: sj.submit() @cli.command()def info(): """Get info about currently running clusters""" sparkhpc.show_clusters() @cli.command()@click.argument('clusterid')def stop(clusterid): """Kill a currently running cluster ('all' to kill all clusters)""" sjs = sparkjob.sparkjob().current_clusters() if clusterid == 'all': if len(sjs) == 0: logger.info(' No clusters running') for sj in sjs: sj.stop() elif int(clusterid) < len(sjs): sjs[int(clusterid)].stop() else: raise RuntimeError('Cluster %s does not exist'%clusterid) @cli.command()@click.option('--memory', default='2000M', help='Memory for each executor using a Java memory string')@click.option('--timeout', default=30, help='Timeout for starting spark master')@click.option('--cores-per-executor', default=1, help='Number of cores per executor')def launch(memory, timeout, cores_per_executor): """Launch the Spark master and workers within a current job context""" sparkjob.start_cluster(memory, timeout=timeout, cores_per_executor=cores_per_executor) if __name__ == "__main__": cli()