dallinger/deployment.py
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import codecs
import json
import os
import re
import threading
import time
from shlex import quote
import redis
import requests
import six
from six.moves.urllib.parse import urlparse, urlunparse
from dallinger import data, db, heroku, recruiters, registration
from dallinger.config import get_config
from dallinger.heroku.tools import HerokuApp, HerokuLocalWrapper
from dallinger.redis_utils import connect_to_redis
from dallinger.utils import (
GitClient,
bootstrap_development_session,
get_base_url,
open_browser,
setup_experiment,
)
DEFAULT_DELAY = 1
BACKOFF_FACTOR = 2
MAX_ATTEMPTS = 6
def handle_launch_data(url, error, delay=DEFAULT_DELAY, attempts=MAX_ATTEMPTS):
"""Sends a POST request to te given `url`, retrying it with exponential backoff.
The passed `error` function is invoked to give feedback as each error occurs,
possibly multiple times.
"""
launch_data = None
launch_request = None
for remaining_attempt in sorted(range(attempts), reverse=True): # [3, 2, 1, 0]
try:
launch_request = requests.post(url)
request_happened = True
except requests.exceptions.RequestException as err:
request_happened = False
error(f"Error accessing {url}:\n{err}")
if request_happened:
try:
launch_data = launch_request.json()
except json.decoder.JSONDecodeError:
# The backend did not return JSON. It means our dallinger instance
# was not (yet) running at the time of the request.
# We treat this similarly to a RequestException: we'll try again after waiting.
request_happened = False
error(
f"Error parsing response from {url}, "
f"check server logs for details.\n{launch_request.text}"
)
except ValueError as err:
error(
f"Error parsing response from {url}, "
f"check server logs for details.\n{err}\n{launch_request.text}"
)
raise
# Early return if successful
if request_happened and launch_request.ok:
return launch_data
if request_happened:
error(
"Error accessing {} ({}):\n{}".format(
url, launch_request.status_code, launch_request.text
)
)
if remaining_attempt:
delay = delay * BACKOFF_FACTOR
next_attempt_count = attempts - (remaining_attempt - 1)
error(
"Experiment launch failed. Trying again "
"(attempt {} of {}) in {} seconds ...".format(
next_attempt_count, attempts, delay
)
)
time.sleep(delay)
error("Experiment launch failed, check server logs for details.")
if launch_data and launch_data.get("message"):
error(launch_data["message"])
if launch_request is not None:
launch_request.raise_for_status()
def deploy_sandbox_shared_setup(
log, verbose=True, app=None, exp_config=None, prelaunch_actions=None
):
"""Set up Git, push to Heroku, and launch the app."""
if verbose:
out = None
else:
out = open(os.devnull, "w")
config = get_config()
if not config.ready:
config.load()
heroku.sanity_check(config)
(heroku_app_id, tmp) = setup_experiment(
log, debug=False, app=app, exp_config=exp_config, local_checks=False
)
# Register the experiment using all configured registration services.
if config.get("mode") == "live":
log("Registering the experiment on configured services...")
registration.register(heroku_app_id, snapshot=None)
# Log in to Heroku if we aren't already.
log("Making sure that you are logged in to Heroku.")
heroku.log_in()
config.set("heroku_auth_token", heroku.auth_token())
log("", chevrons=False)
# Change to temporary directory.
cwd = os.getcwd()
os.chdir(tmp)
# Commit Heroku-specific files to tmp folder's git repo.
git = GitClient(output=out)
git.init()
git.add("--all")
git.commit('"Experiment {}"'.format(heroku_app_id))
# Initialize the app on Heroku.
log("Initializing app on Heroku...")
team = config.get("heroku_team", None)
heroku_app = HerokuApp(dallinger_uid=heroku_app_id, output=out, team=team)
heroku_app.bootstrap()
heroku_app.buildpack("https://github.com/stomita/heroku-buildpack-phantomjs")
# Set up add-ons and AWS environment variables.
database_size = config.get("database_size")
redis_size = config.get("redis_size")
addons = [
"heroku-postgresql:{}".format(quote(database_size)),
"heroku-redis:{}".format(quote(redis_size)),
"papertrail",
]
if config.get("sentry"):
addons.append("sentry")
for name in addons:
heroku_app.addon(name)
heroku_config = {
"AWS_ACCESS_KEY_ID": config["aws_access_key_id"],
"AWS_SECRET_ACCESS_KEY": config["aws_secret_access_key"],
"AWS_DEFAULT_REGION": config["aws_region"],
"activate_recruiter_on_start": config["activate_recruiter_on_start"],
"auto_recruit": config["auto_recruit"],
"smtp_username": config["smtp_username"],
"smtp_password": config["smtp_password"],
"whimsical": config["whimsical"],
"FLASK_SECRET_KEY": codecs.encode(os.urandom(16), "hex").decode("ascii"),
}
# Set up the preferred class as an environment variable, if one is set
# This is needed before the config is parsed, but we also store it in the
# config to make things easier for recording into bundles.
preferred_class = config.get("EXPERIMENT_CLASS_NAME", None)
if preferred_class:
heroku_config["EXPERIMENT_CLASS_NAME"] = preferred_class
heroku_app.set_multiple(**heroku_config)
# Wait for Redis database to be ready.
log("Waiting for Redis (this can take a couple minutes)...", nl=False)
ready = False
while not ready:
try:
r = connect_to_redis(url=heroku_app.redis_url)
r.set("foo", "bar")
ready = True
log("\n✓ connected at {}".format(heroku_app.redis_url), chevrons=False)
except (ValueError, redis.exceptions.ConnectionError):
time.sleep(2)
log(".", chevrons=False, nl=False)
log("Saving the URL of the postgres database...")
config.extend({"database_url": heroku_app.db_url})
config.write()
git.add("config.txt")
git.commit("Save URL for database")
log("Generating dashboard links...")
heroku_addons = heroku_app.addon_parameters()
heroku_addons = json.dumps(heroku_addons)
if six.PY2:
heroku_addons = heroku_addons.decode("utf-8")
config.extend({"infrastructure_debug_details": heroku_addons})
config.write()
git.add("config.txt")
git.commit("Save URLs for heroku addon management")
# Launch the Heroku app.
log("Pushing code to Heroku...")
git.push(remote="heroku", branch="HEAD:master")
log("Scaling up the dynos...")
default_size = config.get("dyno_type")
for process in ["web", "worker"]:
size = config.get("dyno_type_" + process, default_size)
qty = config.get("num_dynos_" + process)
heroku_app.scale_up_dyno(process, qty, size)
if config.get("clock_on"):
heroku_app.scale_up_dyno("clock", 1, size)
if prelaunch_actions is not None:
for task in prelaunch_actions:
task(heroku_app, config)
# Launch the experiment.
log("Launching the experiment on the remote server and starting recruitment...")
launch_url = "{}/launch".format(heroku_app.url)
log("Calling {}".format(launch_url), chevrons=False)
launch_data = handle_launch_data(launch_url, error=log)
result = {
"app_name": heroku_app.name,
"app_home": heroku_app.url,
"dashboard_url": "{}/dashboard/".format(heroku_app.url),
"dashboard_user": config.get("dashboard_user"),
"dashboard_password": config.get("dashboard_password"),
"recruitment_msg": launch_data.get("recruitment_msg", None),
}
log("Experiment details:")
log("App home: {}".format(result["app_home"]), chevrons=False)
log("Dashboard URL: {}".format(result["dashboard_url"]), chevrons=False)
log("Dashboard user: {}".format(config.get("dashboard_user")), chevrons=False)
log(
"Dashboard password: {}".format(config.get("dashboard_password")),
chevrons=False,
)
log("Recruiter info:")
log(result["recruitment_msg"], chevrons=False)
# Return to the branch whence we came.
os.chdir(cwd)
log(
"Completed Heroku deployment of experiment ID {} using app ID {}.".format(
config.get("id"), heroku_app_id
)
)
return result
class DevelopmentDeployment(object):
"""Collates files from Dallinger and the custom experment, then symlinks
them into a target sub-directory, so Flask development server can be run
manually in that directory.
"""
def __init__(self, output, exp_config):
self.out = output
self.exp_config = exp_config or {}
self.exp_config.update({"mode": "debug", "loglevel": 0})
def run(self):
"""Bootstrap the environment and reset the database."""
self.out.log("Preparing your pristine development environment...")
experiment_uid, dst = bootstrap_development_session(
self.exp_config, os.getcwd(), self.out.log
)
self.out.log("Re-initializing database...")
db.init_db(drop_all=True)
self.out.log(
f"Files symlinked in {dst}.\n"
"Run './run.sh' in that directory to start Flask, "
"plus the worker and clock processes."
)
class HerokuLocalDeployment(object):
exp_id = None
tmp_dir = None
dispatch = {} # Subclass may provide handlers for Heroku process output
environ = None
bot = False
DEPLOY_NAME = "Heroku"
WRAPPER_CLASS = HerokuLocalWrapper
DO_INIT_DB = True
def configure(self):
self.exp_config.update({"mode": "debug", "loglevel": 0})
def setup(self):
self.exp_id, self.tmp_dir = setup_experiment(
self.out.log, exp_config=self.exp_config
)
def update_dir(self):
# FIXME: this call is used for implicit communication between classes in this file
# and service wrappers (HerokuLocalWrapper, DockerComposeWrapper).
# This communication should be made explicit, passing this path around instead of
# changing a global state.
os.chdir(self.tmp_dir)
# Update the logfile to the new directory
config = get_config()
logfile = config.get("logfile")
if logfile and logfile != "-":
logfile = os.path.join(self.original_dir, logfile)
config.extend({"logfile": logfile})
config.write()
def run(self):
"""Set up the environment, get a wrapper instance, and pass
it to the concrete class's execute() method.
"""
self.configure()
self.setup()
self.update_dir()
if self.DO_INIT_DB:
db.init_db(drop_all=True)
config = get_config()
environ = None
if self.environ:
environ = os.environ.copy()
environ.update(self.environ)
self.out.log(f"Starting up the {self.DEPLOY_NAME} Local server...")
with self.WRAPPER_CLASS(
config,
self.out,
self.original_dir,
self.tmp_dir,
verbose=self.verbose,
env=environ,
needs_chrome=self.bot,
) as wrapper:
try:
self.execute(wrapper)
except KeyboardInterrupt:
pass
finally:
os.chdir(self.original_dir)
self.cleanup()
def notify(self, message):
"""Callback function which checks lines of output, tries to match
against regex defined in subclass's "dispatch" dict, and passes through
to a handler on match.
"""
for regex, handler in self.dispatch.items():
match = re.search(regex, message)
if match:
handler = getattr(self, handler)
return handler(match)
def execute(self, heroku):
raise NotImplementedError()
class DebugDeployment(HerokuLocalDeployment):
dispatch = {
r"[^\"]{} (.*)$".format(recruiters.NEW_RECRUIT_LOG_PREFIX): "new_recruit",
r"{}".format(recruiters.CLOSE_RECRUITMENT_LOG_PREFIX): "recruitment_closed",
}
def __init__(self, output, verbose, bot, proxy_port, exp_config, no_browsers=False):
self.out = output
self.verbose = verbose
self.bot = bot
self.exp_config = exp_config or {}
self.proxy_port = proxy_port
self.original_dir = os.getcwd()
self.complete = False
self.status_thread = None
self.no_browsers = no_browsers
self.environ = {
"FLASK_SECRET_KEY": codecs.encode(os.urandom(16), "hex").decode("ascii"),
}
def with_proxy_port(self, url):
if self.proxy_port is not None:
self.out.log("Using proxy port {}".format(self.proxy_port))
url = url.replace(str(get_config().get("base_port")), self.proxy_port)
return url
def configure(self):
super(DebugDeployment, self).configure()
if self.bot:
self.exp_config["recruiter"] = "bots"
def execute(self, heroku):
base_url = get_base_url()
self.out.log("Server is running on {}. Press Ctrl+C to exit.".format(base_url))
self.out.log("Launching the experiment...")
try:
result = handle_launch_data(
"{}/launch".format(base_url), error=self.out.error, attempts=1
)
except Exception:
# Show output from server
self.dispatch[r"POST /launch"] = "launch_request_complete"
heroku.monitor(listener=self.notify)
else:
if result["status"] == "success":
self.out.log(result["recruitment_msg"])
dashboard_url = self.with_proxy_port("{}/dashboard/".format(base_url))
self.display_dashboard_access_details(dashboard_url)
if not self.no_browsers:
self.async_open_dashboard(dashboard_url)
# A little delay here ensures that the experiment window always opens
# after the dashboard window.
time.sleep(0.1)
self.heroku = heroku
self.out.log(
"Monitoring the Heroku Local server for recruitment or completion..."
)
heroku.monitor(listener=self.notify)
def launch_request_complete(self, match):
return HerokuLocalWrapper.MONITOR_STOP
def cleanup(self):
self.out.log("Completed debugging of experiment with id " + self.exp_id)
self.complete = True
def new_recruit(self, match):
"""Dispatched to by notify(). If a recruitment request has been issued,
open a browser window for the a new participant (in this case the
person doing local debugging).
"""
self.out.log("new recruitment request!")
if self.no_browsers:
self.out.log(recruiters.NEW_RECRUIT_LOG_PREFIX + ": " + match.group(1))
return
url = self.with_proxy_port(match.group(1))
open_browser(url)
def display_dashboard_access_details(self, url):
config = get_config()
self.out.log("Experiment dashboard: {}".format(url))
self.out.log(
"Dashboard user: {} password: {}".format(
config.get("dashboard_user"),
config.get("dashboard_password"),
)
)
def async_open_dashboard(self, url):
threading.Thread(
target=self.open_dashboard, name="Open dashboard", kwargs={"url": url}
).start()
def open_dashboard(self, url):
config = get_config()
self.out.log("Opening dashboard")
parsed = list(urlparse(url))
parsed[1] = "{}:{}@{}".format(
config.get("dashboard_user"),
config.get("dashboard_password"),
parsed[1],
)
open_browser(urlunparse(parsed))
def recruitment_closed(self, match):
"""Recruitment is closed.
Start a thread to check the experiment summary.
"""
if self.no_browsers:
self.out.log(recruiters.CLOSE_RECRUITMENT_LOG_PREFIX)
if self.status_thread is None:
self.status_thread = threading.Thread(target=self.check_status)
self.status_thread.start()
def check_status(self):
"""Check the output of the summary route until
the experiment is complete, then we can stop monitoring Heroku
subprocess output.
"""
self.out.log("Recruitment is complete. Waiting for experiment completion...")
base_url = get_base_url()
status_url = base_url + "/summary"
while not self.complete:
time.sleep(10)
try:
resp = requests.get(status_url)
exp_data = resp.json()
except (ValueError, requests.exceptions.RequestException):
self.out.error("Error fetching experiment status.")
else:
self.out.log("Experiment summary: {}".format(exp_data))
if exp_data.get("completed", False):
self.out.log("Experiment completed, all nodes filled.")
self.complete = True
self.heroku.stop()
def notify(self, message):
"""Monitor output from heroku process.
This overrides the base class's `notify`
to make sure that we stop if the status-monitoring thread
has determined that the experiment is complete.
"""
if self.complete:
return HerokuLocalWrapper.MONITOR_STOP
return super(DebugDeployment, self).notify(message)
class LoaderDeployment(HerokuLocalDeployment):
dispatch = {"Replay ready: (.*)$": "start_replay"}
def __init__(self, app_id, output, verbose, exp_config):
self.app_id = app_id
self.out = output
self.verbose = verbose
self.exp_config = exp_config or {}
self.original_dir = os.getcwd()
self.zip_path = None
def configure(self):
self.exp_config.update({"mode": "debug", "loglevel": 0})
self.zip_path = data.find_experiment_export(self.app_id)
if self.zip_path is None:
msg = 'Dataset export for app id "{}" could not be found.'
raise IOError(msg.format(self.app_id))
def setup(self):
self.exp_id, self.tmp_dir = setup_experiment(
self.out.log, app=self.app_id, exp_config=self.exp_config
)
def execute(self, heroku):
"""Start the server, load the zip file into the database, then loop
until terminated with <control>-c.
"""
db.init_db(drop_all=True)
self.out.log(
"Ingesting dataset from {}...".format(os.path.basename(self.zip_path))
)
data.ingest_zip(self.zip_path)
base_url = get_base_url()
self.out.log("Server is running on {}. Press Ctrl+C to exit.".format(base_url))
if self.exp_config.get("replay"):
self.out.log("Launching the experiment...")
time.sleep(4)
handle_launch_data("{}/launch".format(base_url), error=self.out.error)
heroku.monitor(listener=self.notify)
# Just run until interrupted:
while self.keep_running():
time.sleep(1)
def start_replay(self, match):
"""Dispatched to by notify(). If a recruitment request has been issued,
open a browser window for the a new participant (in this case the
person doing local debugging).
"""
self.out.log("replay ready!")
url = match.group(1)
open_browser(url)
def cleanup(self):
self.out.log("Terminating dataset load for experiment {}".format(self.exp_id))
def keep_running(self):
# This is a separate method so that it can be replaced in tests
return True