themis/api.py
from flask import Flask, render_template, jsonify, send_from_directory, request
from flask_swagger import swagger
import os
import re
import json
import time
import threading
import traceback
import themis
from themis import config, server
from themis.config import *
from themis.constants import *
from themis.util import common, aws_common, aws_pricing
from themis.util.aws_common import INSTANCE_GROUP_TYPE_TASK
from themis.scaling import emr_scaling
from themis.monitoring import resources, emr_monitoring, kinesis_monitoring, database
root_path = os.path.dirname(os.path.realpath(__file__))
web_dir = root_path + '/web/'
app = Flask('app', template_folder=web_dir)
app.root_path = root_path
# -----------------------
# Generic top-level APIs
# -----------------------
@app.route('/swagger.json')
def spec():
swag = swagger(app)
swag['info']['version'] = "1.1"
swag['info']['title'] = "Themis Autoscaling API"
return jsonify(swag)
@app.route('/healthcheck')
def healthcheck():
result = {
'status': 'OK',
'remote_addr': request.remote_addr
}
cmds = {
'disk': 'df -h',
'uptime': 'uptime',
'proc_python': 'ps aux | grep python'
}
for key, cmd in cmds.iteritems():
try:
result[key] = common.run(cmd)
except Exception, e:
result[key] = str(e)
return jsonify(result)
@app.route('/config/<section>', methods=['GET'])
def get_global_config(section):
""" Get global configuration
---
operationId: 'getGlobalConfig'
parameters:
- name: 'section'
in: path
"""
return do_get_config(section)
@app.route('/config/<section>/<resource>', methods=['GET'])
def get_config(section, resource):
""" Get configuration
---
operationId: 'getConfig'
parameters:
- name: 'section'
in: path
- name: 'resource'
in: path
"""
return do_get_config(section, resource)
@app.route('/config/<section>/', methods=['POST'])
def set_global_config(section):
""" Set global configuration
---
operationId: 'setGlobalConfig'
parameters:
- name: 'config'
in: body
- name: 'section'
in: path
"""
new_config = json.loads(request.data)
return do_set_config(section, new_config)
@app.route('/config/<section>/<resource>', methods=['POST'])
def set_config(section, resource=None):
""" Set configuration
---
operationId: 'setConfig'
parameters:
- name: 'config'
in: body
- name: 'section'
in: path
- name: 'resource'
in: path
"""
new_config = json.loads(request.data)
return do_set_config(section, new_config, resource=resource)
def do_get_config(section, resource=None):
cfg = config.get_config()
cfg = cfg.get(section)
if resource:
cfg = cfg.get(resource)
cfg = cfg.to_dict() if cfg else {}
cfg = config.convert_to_list(cfg)
return jsonify({'config': cfg})
def do_set_config(section, new_config, resource=None):
if isinstance(new_config, list):
new_config = config.convert_from_list(new_config)
config.write(new_config, section=section, resource=resource)
cfg = config.get_config(force_load=True)
cfg = cfg.get(section)
if resource:
cfg = cfg.get(resource)
cfg = cfg.to_dict() if cfg else {}
cfg = config.convert_to_list(cfg)
return jsonify({'config': cfg})
# ----------------------------------------
# EMR specific APIs, prefixed with /emr/
# ----------------------------------------
@app.route('/emr/state/<cluster_id>')
def get_emr_state(cluster_id):
""" Get EMR cluster state
---
operationId: 'getEmrState'
parameters:
- name: cluster_id
in: path
"""
app_config = config.get_config()
cluster = resources.get_resource(SECTION_EMR, cluster_id)
monitoring_interval_secs = int(app_config.general.monitoring_time_window)
info = emr_monitoring.collect_info(cluster, monitoring_interval_secs=monitoring_interval_secs)
return jsonify(info)
@app.route('/emr/history/<cluster_id>')
def get_emr_history(cluster_id):
""" Get EMR cluster state history
---
operationId: 'getEmrHistory'
parameters:
- name: 'cluster_id'
in: path
"""
info = database.history_get(section=SECTION_EMR, resource=cluster_id, limit=100)
common.remove_NaN(info)
return jsonify(results=info)
@app.route('/emr/clusters')
def get_emr_clusters():
""" Get list of EMR clusters
---
operationId: 'getEmrClusters'
"""
resource_list = resources.get_resources('emr')
result = [r.to_dict() for r in resource_list]
return jsonify(results=result)
@app.route('/emr/restart', methods=['POST'])
def restart_emr_node():
""" Restart a cluster node
---
operationId: 'restartEmrNode'
parameters:
- name: 'request'
in: body
"""
data = json.loads(request.data)
cluster_id = data['cluster_id']
node_host = data['node_host']
app_config = config.get_config()
clusters = app_config.emr.to_dict()
for c_id, details in clusters.iteritems():
if c_id == cluster_id:
cluster_ip = details.ip
tasknodes_group = aws_common.get_instance_group_for_node(cluster_id, node_host)
if tasknodes_group:
server.terminate_node(cluster_ip, node_host, tasknodes_group)
return jsonify({'result': 'SUCCESS'})
return jsonify({'result': 'Invalid cluster ID provided'})
@app.route('/emr/costs', methods=['POST'])
def get_emr_costs():
""" Get summary of cluster costs and cost savings
---
operationId: 'getEmrCosts'
parameters:
- name: 'request'
in: body
"""
data = json.loads(request.data)
cluster_id = data['cluster_id']
num_datapoints = data['num_datapoints'] if 'num_datapoints' in data else 300
baseline_nodes = (data['baseline_nodes'] if 'baseline_nodes' in data else
config.get_value(KEY_BASELINE_COMPARISON_NODES, section=SECTION_EMR, resource=cluster_id, default=20))
baseline_nodes = int(baseline_nodes)
info = database.history_get(section=SECTION_EMR, resource=cluster_id, limit=num_datapoints)
common.remove_NaN(info)
result = aws_pricing.get_cluster_savings(info, baseline_nodes)
common.remove_NaN(result, delete_values=False, replacement=0)
return jsonify(results=result, baseline_nodes=baseline_nodes)
# -----------------------------------------------
# Kinesis specific APIs, prefixed with /kinesis/
# -----------------------------------------------
@app.route('/kinesis/streams')
def get_kinesis_streams():
""" Get list of Kinesis streams
---
operationId: 'getKinesisStreams'
"""
resource_list = resources.get_resources('kinesis')
result = [r.to_dict() for r in resource_list]
return jsonify(results=result)
@app.route('/kinesis/state/<stream_id>')
def get_kinesis_state(stream_id):
""" Get Kinesis stream state
---
operationId: 'getKinesisState'
parameters:
- name: stream_id
in: path
"""
app_config = config.get_config()
stream = resources.get_resource(SECTION_KINESIS, stream_id, reload=True)
monitoring_interval_secs = int(app_config.general.monitoring_time_window)
info = kinesis_monitoring.collect_info(stream, monitoring_interval_secs=monitoring_interval_secs)
return jsonify(info)
@app.route('/kinesis/history/<stream_id>')
def get_kinesis_history(stream_id):
""" Get Kinesis stream state history
---
operationId: 'getKinesisHistory'
parameters:
- name: 'stream_id'
in: path
"""
info = database.history_get(section=SECTION_KINESIS, resource=stream_id, limit=100)
common.remove_NaN(info)
return jsonify(results=info)
# ------------------------
# Addition default routes
# ------------------------
@app.route('/')
def hello():
return render_template('index.html')
@app.route('/<path:path>')
def send_static(path):
return send_from_directory(web_dir + '/', path)
def serve(port):
app.run(port=int(port), debug=False, threaded=True, host='0.0.0.0')