rokroskar/sparkhpc

View on GitHub
scripts/sparkcluster

Summary

Maintainability
Test Coverage
#!/bin/env python
#
#
# CLI for starting and running Spark standalone clusters on HPC resources
#
#

from __future__ import print_function
import click
import sparkhpc
from sparkhpc import sparkjob
import subprocess
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('sparkhpc')

home = os.path.expanduser('~')

SCHEDULER = sparkjob.get_scheduler()

@click.group()
def cli():
    pass

@cli.command()
@click.argument('ncores', type=int)
@click.option('--walltime', default="00:30", help="Walltime in HH:MM format")
@click.option('--jobname', default='sparkcluster', help='Name to use for the job')
@click.option('--template', default=None, help='Job template path')
@click.option('--memory-per-executor', default=2000, envvar='SPARK_EXECUTOR_MEMORY',
              help='Memory to reserve for each executor (i.e. the JVM) in MB')
@click.option('--memory-per-core', default=2000,
              help='Memory per core to request from scheduler in MB')
@click.option('--cores-per-executor', default=1,
              help='Cores per executor')
@click.option('--spark-home', default=os.path.join(home,'spark'), envvar='SPARK_HOME', 
              help='Location of the Spark distribution')
@click.option('--wait', default=False, is_flag=True, help='Wait until the job starts')
def start(ncores, 
          walltime, 
          jobname, 
          template, 
          memory_per_executor, 
          memory_per_core, 
          cores_per_executor,
          spark_home, 
          wait):
    """Start the spark cluster as a batch job"""
    
    sj = sparkjob.sparkjob(ncores=ncores, 
                           walltime=walltime, 
                           jobname=jobname, 
                           template=template, 
                           memory_per_core=memory_per_core,
                           memory_per_executor=memory_per_executor, 
                           cores_per_executor=cores_per_executor,
                           spark_home=spark_home)
    
    if wait: 
        logger.info(' Waiting for job to start - ctrl-c to stop')
        sj.wait_to_start()
    else:
        sj.submit()
    

@cli.command()
def info():
    """Get info about currently running clusters"""
    sparkhpc.show_clusters()


@cli.command()
@click.argument('clusterid')
def stop(clusterid):
    """Kill a currently running cluster ('all' to kill all clusters)"""
    sjs = sparkjob.sparkjob().current_clusters()

    if clusterid == 'all': 
        if len(sjs) == 0: 
            logger.info(' No clusters running')
        for sj in sjs: 
            sj.stop()
    elif int(clusterid) < len(sjs): 
        sjs[int(clusterid)].stop()
    else: 
        raise RuntimeError('Cluster %s does not exist'%clusterid)


@cli.command()
@click.option('--memory', default='2000M', help='Memory for each executor using a Java memory string')
@click.option('--timeout', default=30, help='Timeout for starting spark master')
@click.option('--cores-per-executor', default=1, help='Number of cores per executor')
def launch(memory, timeout, cores_per_executor):
    """Launch the Spark master and workers within a current job context"""
    sparkjob.start_cluster(memory, timeout=timeout, cores_per_executor=cores_per_executor)

if __name__ == "__main__":
    cli()