dallinger/experiment_server/experiment_server.py
""" This module provides the backend Flask server that serves an experiment. """
import os
import re
from datetime import datetime
from json import dumps, loads
import gevent
from flask import (
Flask,
Response,
abort,
redirect,
render_template,
request,
send_from_directory,
url_for,
)
from flask_login import LoginManager, current_user, login_required
from jinja2 import TemplateNotFound
from psycopg2.extensions import TransactionRollbackError
from rq import Queue
from sqlalchemy import exc, func
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy.sql.expression import true
from dallinger import db, experiment, models, recruiters
from dallinger.config import get_config
from dallinger.notifications import MessengerError, admin_notifier
from dallinger.utils import generate_random_id
from . import dashboard
from .replay import ReplayBackend
from .utils import (
ExperimentError,
ValidatesBrowser,
crossdomain,
error_page,
error_response,
nocache,
success_response,
)
from .worker_events import worker_function
# Initialize the Dallinger database.
session = db.session
redis_conn = db.redis_conn
# Connect to the Redis queue for notifications.
q = Queue("default", connection=redis_conn)
WAITING_ROOM_CHANNEL = "quorum"
app = Flask("Experiment_Server")
@app.before_request
def _load_config():
_config()
@app.before_request
def check_for_protected_routes():
if current_user.is_authenticated:
return
try:
active_rule = request.url_rule.rule
except AttributeError:
return
protected = Experiment(None).protected_routes
if active_rule in protected:
raise PermissionError(
f'Unauthorized call to protected route "{active_rule}": {request}'
)
def _config():
app.secret_key = app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY")
config = get_config()
if not config.ready:
config.load()
if config.get("dashboard_password", None):
app.config["ADMIN_USER"] = dashboard.User(
userid=config.get("dashboard_user", "admin"),
password=config.get("dashboard_password"),
)
return config
def Experiment(args):
_config()
klass = experiment.load()
return klass(args)
# Load the experiment's extra routes, if any.
try:
from dallinger_experiment.experiment import extra_routes
except ImportError:
pass
else:
app.register_blueprint(extra_routes)
# Enable the websocket route. This needs to be imported after app is defined to
# avoid an import loop.
# XXX: It would be nice to not do this at import time and
# avoid this circularity, but doing so seems to cause tests in `test_deployment`
# to deadlock.
from dallinger.experiment_server import sockets # noqa: E402
# Skipping coverage testing on this for now because it only runs at import time
# and cannot be exercised within tests
try:
exp_klass = experiment.load()
except ImportError:
exp_klass = None # pragma: no cover
@app.before_request
def before_request():
if exp_klass is not None:
return exp_klass.before_request()
@app.after_request
def after_request(response):
if exp_klass is not None:
return exp_klass.after_request(request, response)
return response
if exp_klass is not None: # pragma: no cover
bp = exp_klass.experiment_routes
routes = experiment.EXPERIMENT_ROUTE_REGISTRATIONS
for route in routes:
route_func = getattr(exp_klass, route["func_name"], None)
if route_func is not None:
bp.add_url_rule(
route["rule"],
endpoint=route["func_name"],
view_func=route_func,
**dict(route["kwargs"]),
)
if routes:
app.register_blueprint(bp)
dash_routes = dashboard.DASHBOARD_ROUTE_REGISTRATIONS
for route in dash_routes:
route_func = getattr(exp_klass, route["func_name"], None)
if route_func is not None:
# All dashboard routes require login
route_func = login_required(route_func)
route_name = route["func_name"]
dashboard.dashboard.add_url_rule(
"/" + route_name,
endpoint=route_name,
view_func=route_func,
**dict(route["kwargs"]),
)
tabs = dashboard.dashboard_tabs
full_tab = route.get("tab")
if route.get("before_route") and full_tab:
tabs.insert_tab_before_route(full_tab, route["before_route"])
elif route.get("before_route"):
tabs.insert_before_route(
route["title"], route_name, route["before_route"]
)
elif route.get("after_route") and full_tab:
tabs.insert_tab_after_route(full_tab, route["after_route"])
elif route.get("after_route"):
tabs.insert_after_route(
route["title"], route_name, route["after_route"]
)
elif full_tab:
tabs.insert(full_tab)
else:
tabs.insert(route["title"], route_name)
# This hides dashboard tabs from view, but doesn't prevent the routes from
# being registered
hidden_dashboards = getattr(exp_klass, "hidden_dashboards", ())
for route_name in hidden_dashboards:
dashboard.dashboard_tabs.remove(route_name)
# Ideally, we'd only load recruiter routes if the recruiter is active, but
# it turns out this is complicated, so for now we always register our
# primary recruiters' routes:
app.register_blueprint(recruiters.mturk_routes)
app.register_blueprint(recruiters.prolific_routes)
# Load dashboard routes and login setup
app.register_blueprint(dashboard.dashboard)
login = LoginManager(app)
login.login_view = "dashboard.login"
login.request_loader(dashboard.load_user_from_request)
login.user_loader(dashboard.load_user)
login.unauthorized_handler(dashboard.unauthorized)
app.config["dashboard_tabs"] = dashboard.dashboard_tabs
"""Basic routes."""
@app.route("/")
def index():
"""Index route"""
html = (
"<html><head></head><body><h1>Dallinger Experiment in progress</h1>"
"<p><a href={}>Dashboard</a></p></body></html>".format(
url_for("dashboard.index")
)
)
return html
@app.route("/robots.txt")
def static_robots_txt():
"""Serve robots.txt from static file."""
return send_from_directory("static", "robots.txt")
@app.route("/favicon.ico")
def static_favicon():
return send_from_directory("static", "favicon.ico", mimetype="image/x-icon")
@app.errorhandler(ExperimentError)
def handle_exp_error(exception):
"""Handle errors by sending an error page."""
app.logger.error(
"%s (%s) %s", exception.value, exception.errornum, str(dict(request.args))
)
return error_page(error_type=exception.value)
"""Define functions for handling requests."""
@app.teardown_request
def shutdown_session(_=None):
"""Rollback and close session at end of a request."""
session.remove()
db.logger.debug("Closing Dallinger DB session at flask request end")
@app.context_processor
def inject_experiment():
"""Inject experiment and enviroment variables into the template context."""
exp = Experiment(session)
return dict(experiment=exp, env=os.environ)
@app.route("/error-page", methods=["POST", "GET"])
def render_error():
request_data = request.form.get("request_data")
participant_id = request.form.get("participant_id")
participant = None
if participant_id:
participant = models.Participant.query.get(participant_id)
return error_page(participant=participant, request_data=request_data)
hit_error_template = """Dear experimenter,
This is an automated email from Dallinger. You are receiving this email because
a recruited participant has been unable to complete the experiment due to
a bug.
The application id is: {app_id}
The information about the failed HIT is recorded in the database in the
Notification table, with assignment_id {assignment_id}.
To see the logs, use the command "dallinger logs --app {app_id}"
To pause the app, use the command "dallinger hibernate --app {app_id}"
To destroy the app, use the command "dallinger destroy --app {app_id}"
The Dallinger dev. team.
"""
@app.route("/handle-error", methods=["POST"])
def handle_error():
request_data = request.form.get("request_data")
error_feedback = request.form.get("error_feedback")
error_type = request.form.get("error_type")
error_text = request.form.get("error_text")
worker_id = request.form.get("worker_id")
assignment_id = request.form.get("assignment_id")
participant_id = request.form.get("participant_id")
hit_id = request.form.get("hit_id")
participant = None
completed = False
details = {"request_data": {}}
if request_data:
try:
request_data = loads(request_data)
except ValueError:
request_data = {}
details["request_data"] = request_data
try:
data = loads(request_data.get("data", "null")) or request_data
except ValueError:
data = request_data
if not participant_id and "participant_id" in data:
participant_id = data["participant_id"]
if not worker_id and "worker_id" in data:
worker_id = data["worker_id"]
if not assignment_id and "assignment_id" in data:
assignment_id = data["assignment_id"]
if not hit_id and "hit_id" in data:
hit_id = data["hit_id"]
if participant_id:
try:
participant_id = int(participant_id)
except (ValueError, TypeError):
participant_id = None
details["feedback"] = error_feedback
details["error_type"] = error_type
details["error_text"] = error_text
if participant_id is None and worker_id:
participants = (
session.query(models.Participant).filter_by(worker_id=worker_id).all()
)
if participants:
participant = participants[0]
if not assignment_id:
assignment_id = participant.assignment_id
if participant_id is None and assignment_id:
participants = (
session.query(models.Participant).filter_by(worker_id=assignment_id).all()
)
if participants:
participant = participants[0]
participant_id = participant.id
if not worker_id:
worker_id = participant.worker_id
if participant_id is not None:
_worker_complete(participant_id)
completed = True
details["request_data"].update(
{"worker_id": worker_id, "hit_id": hit_id, "participant_id": participant_id}
)
notif = models.Notification(
assignment_id=assignment_id or "unknown",
event_type="ExperimentError",
details=details,
)
session.add(notif)
session.commit()
config = _config()
message = {
"subject": "Error during HIT.",
"body": hit_error_template.format(
app_id=config.get("id", "unknown"), assignment_id=assignment_id or "unknown"
),
}
db.logger.debug("Reporting HIT error...")
messenger = admin_notifier(config)
try:
messenger.send(**message)
except MessengerError as ex:
db.logger.exception(ex)
return render_template(
"error-complete.html",
completed=completed,
contact_address=config.get("contact_email_on_error"),
hit_id=hit_id,
)
"""Define routes for managing an experiment and the participants."""
@app.route("/launch", methods=["POST"])
def launch():
"""Launch the experiment."""
try:
exp = Experiment(db.init_db(drop_all=False))
except Exception as ex:
return error_response(
error_text="Failed to load experiment in /launch: {}".format(str(ex)),
status=500,
simple=True,
)
try:
exp.log("Launching experiment...", "-----")
except IOError as ex:
return error_response(
error_text="IOError writing to experiment log: {}".format(str(ex)),
status=500,
simple=True,
)
try:
exp.on_launch()
except Exception as e:
return error_response(
error_text="An error occurred when calling on_launch(), check experiment server log "
"for details: {}".format(str(e)),
status=500,
simple=True,
)
recruitment_details = None
if _config().get("activate_recruiter_on_start"):
try:
recruitment_details = exp.recruiter.open_recruitment(
n=exp.initial_recruitment_size
)
session.commit()
except Exception as e:
return error_response(
error_text="Failed to open recruitment, check experiment server log "
"for details: {}".format(str(e)),
status=500,
simple=True,
)
for task in exp.background_tasks:
try:
gevent.spawn(task)
except Exception:
return error_response(
error_text="Failed to spawn task on launch: {}, ".format(task)
+ "check experiment server log for details",
status=500,
simple=True,
)
if _config().get("replay", False):
try:
task = ReplayBackend(exp)
gevent.spawn(task)
except Exception:
return error_response(
error_text="Failed to launch replay task for experiment."
"check experiment server log for details",
status=500,
simple=True,
)
# If the experiment defines a channel, subscribe the experiment to the
# redis communication channel:
if exp.channel is not None:
try:
sockets.chat_backend.subscribe(exp, exp.channel)
# Additionally subscribe the experiment to the Dallinger Control
# channel for messages about websocket
# connect/disconnect/subscribe/unsubscribe events
sockets.chat_backend.subscribe(exp, sockets.CONTROL_CHANNEL)
except Exception:
return error_response(
error_text="Failed to subscribe to chat for channel on launch "
+ "{}".format(exp.channel)
+ ", check experiment server log for details",
status=500,
simple=True,
)
if recruitment_details is not None:
message = "\n".join(
(
"Initial recruitment list:\n{}".format(
"\n".join(recruitment_details["items"])
),
"Additional details:\n{}".format(recruitment_details["message"]),
)
)
else:
message = "Recruitment hasn't been started yet. Please, initialize recruitment manually!"
return success_response(recruitment_msg=message)
def prepare_advertisement():
session = db.session
config = _config()
mode = config.get("mode")
# Browser rule validation, if configured:
browser = ValidatesBrowser(config)
if not browser.is_supported(request.user_agent.string):
raise ExperimentError("browser_type_not_allowed")
entry_information = request.args.to_dict()
if entry_information.get("generate_tokens", None) in ("1", "true", "yes"):
redirect_params = entry_information.copy()
del redirect_params["generate_tokens"]
for entry_param in ("hitId", "assignmentId", "workerId"):
if not redirect_params.get(entry_param):
redirect_params[entry_param] = generate_random_id()
return True, {"redirect": redirect(url_for("advertisement", **redirect_params))}
app_id = config.get("id", "unknown")
exp = Experiment(session)
entry_data = exp.normalize_entry_information(entry_information)
hit_id = entry_data.get("hit_id")
assignment_id = entry_data.get("assignment_id")
worker_id = entry_data.get("worker_id")
if not (hit_id and assignment_id):
raise ExperimentError("hit_assign_worker_id_not_set_by_recruiter")
if worker_id is not None:
# Check if this workerId has completed the task before
already_participated = (
models.Participant.query.filter(
models.Participant.worker_id == worker_id
).first()
is not None
)
if already_participated:
raise ExperimentError("already_did_exp_hit")
recruiter_name = request.args.get("recruiter")
if not recruiter_name:
recruiter = recruiters.from_config(config)
recruiter_name = recruiter.nickname
kwargs = {
"recruiter": recruiter_name,
"hitid": hit_id,
"assignmentid": assignment_id,
"workerid": worker_id,
"mode": mode,
"app_id": app_id,
"query_string": request.query_string.decode(),
}
return False, kwargs
@app.route("/ad", methods=["GET"])
@nocache
def advertisement():
"""
This is the url we give for the ad for our 'external question'. The ad has
to display two different things: This page will be called from within
mechanical turk, with url arguments hitId, assignmentId, and workerId.
If the worker has not yet accepted the hit:
These arguments will have null values, we should just show an ad for
the experiment.
If the worker has accepted the hit:
These arguments will have appropriate values and we should enter the
person in the database and provide a link to the experiment popup.
If the url includes an argument ``generate_tokens``:
The user will be redirected to this view with random recruiter
arguments set.
"""
is_redirect, kw = prepare_advertisement()
if is_redirect:
return kw["redirect"]
else:
# Participant has not yet agreed to the consent. They might not
# even have accepted the HIT.
return render_template("ad.html", **kw)
@app.route("/recruiter-exit", methods=["GET"])
@nocache
def recruiter_exit():
"""Display an exit page defined by the Participant's Recruiter.
The Recruiter may in turn delegate to the Experiment for additional
values to display.
"""
participant_id = request.args.get("participant_id")
if participant_id is None:
return error_response(
error_type="/recruiter-exit GET: param participant_id is required",
status=400,
)
participant = models.Participant.query.get(participant_id)
if participant is None:
return error_response(
error_type="/recruiter-exit GET: no participant found for ID {}".format(
participant_id
),
status=404,
)
# Get the recruiter from the participant rather than config, to support
# MultiRecruiter experiments
recruiter = recruiters.by_name(participant.recruiter_id)
exp = Experiment(session)
return recruiter.exit_response(experiment=exp, participant=participant)
@app.route("/summary", methods=["GET"])
def summary():
"""Summarize the participants' status codes."""
exp = Experiment(session)
state = {
"status": "success",
"summary": exp.log_summary(),
"completed": exp.is_complete(),
}
unfilled_nets = (
models.Network.query.filter(models.Network.full != true())
.with_entities(models.Network.id, models.Network.max_size)
.all()
)
working = (
models.Participant.query.filter_by(status="working")
.with_entities(func.count(models.Participant.id))
.scalar()
)
state["unfilled_networks"] = len(unfilled_nets)
nodes_remaining = 0
required_nodes = 0
if state["unfilled_networks"] == 0:
if working == 0 and state["completed"] is None:
state["completed"] = True
else:
for net in unfilled_nets:
node_count = (
models.Node.query.filter_by(network_id=net.id, failed=False)
.with_entities(func.count(models.Node.id))
.scalar()
)
net_size = net.max_size
required_nodes += net_size
nodes_remaining += net_size - node_count
state["nodes_remaining"] = nodes_remaining
state["required_nodes"] = required_nodes
if state["completed"] is None:
state["completed"] = False
# Regenerate a waiting room message when checking status
# to counter missed messages at the end of the waiting room
nonfailed_count = models.Participant.query.filter(
(models.Participant.status == "working")
| (models.Participant.status == "overrecruited")
| (models.Participant.status == "submitted")
| (models.Participant.status == "approved")
).count()
exp = Experiment(session)
overrecruited = exp.is_overrecruited(nonfailed_count)
if exp.quorum:
quorum = {"q": exp.quorum, "n": nonfailed_count, "overrecruited": overrecruited}
db.queue_message(WAITING_ROOM_CHANNEL, dumps(quorum))
return Response(dumps(state), status=200, mimetype="application/json")
@app.route("/experiment_property/<prop>", methods=["GET"])
@app.route("/experiment/<prop>", methods=["GET"])
def experiment_property(prop):
"""Get a property of the experiment by name."""
exp = Experiment(session)
try:
value = exp.public_properties[prop]
except KeyError:
abort(404)
return success_response(**{prop: value})
@app.route("/<page>", methods=["GET"])
def get_page(page):
"""Return the requested page."""
try:
return render_template(page + ".html")
except TemplateNotFound:
abort(404)
@app.route("/<directory>/<page>", methods=["GET"])
def get_page_from_directory(directory, page):
"""Get a page from a given directory."""
return render_template(directory + "/" + page + ".html")
@app.route("/consent")
def consent():
"""Return the consent form. Here for backwards-compatibility with 2.x."""
config = _config()
entry_information = request.args.to_dict()
exp = Experiment(session)
entry_data = exp.normalize_entry_information(entry_information)
hit_id = entry_data.get("hit_id")
assignment_id = entry_data.get("assignment_id")
worker_id = entry_data.get("worker_id")
return render_template(
"consent.html",
hit_id=hit_id,
assignment_id=assignment_id,
worker_id=worker_id,
mode=config.get("mode"),
query_string=request.query_string.decode(),
)
"""Routes for reading and writing to the database."""
def request_parameter(parameter, parameter_type=None, default=None, optional=False):
"""Get a parameter from a request.
parameter is the name of the parameter you are looking for
parameter_type is the type the parameter should have
default is the value the parameter takes if it has not been passed
If the parameter is not found and no default is specified,
or if the parameter is found but is of the wrong type
then a Response object is returned
"""
exp = Experiment(session)
# get the parameter
try:
value = request.values[parameter]
except KeyError:
# if it isnt found use the default, or return an error Response
if default is not None:
return default
elif optional:
return None
else:
msg = "{} {} request, {} not specified".format(
request.url, request.method, parameter
)
return error_response(error_type=msg)
# check the parameter type
if parameter_type is None:
# if no parameter_type is required, return the parameter as is
return value
elif parameter_type == "int":
# if int is required, convert to an int
try:
value = int(value)
return value
except ValueError:
msg = "{} {} request, non-numeric {}: {}".format(
request.url, request.method, parameter, value
)
return error_response(error_type=msg)
elif parameter_type == "known_class":
# if its a known class check against the known classes
try:
value = exp.known_classes[value]
return value
except KeyError:
msg = "{} {} request, unknown_class: {} for parameter {}".format(
request.url, request.method, value, parameter
)
return error_response(error_type=msg)
elif parameter_type == "bool":
# if its a boolean, convert to a boolean
if value in ["True", "False"]:
return value == "True"
else:
msg = "{} {} request, non-boolean {}: {}".format(
request.url, request.method, parameter, value
)
return error_response(error_type=msg)
else:
msg = "/{} {} request, unknown parameter type: {} for parameter {}".format(
request.url, request.method, parameter_type, parameter
)
return error_response(error_type=msg)
def assign_properties(thing):
"""Assign properties to an object.
When creating something via a post request (e.g. a node), you can pass the
properties of the object in the request. This function gets those values
from the request and fills in the relevant columns of the table.
"""
details = request_parameter(parameter="details", optional=True)
if details:
setattr(thing, "details", loads(details))
for p in range(5):
property_name = "property" + str(p + 1)
property = request_parameter(parameter=property_name, optional=True)
if property:
setattr(thing, property_name, property)
session.commit()
@app.route("/participant/<worker_id>/<hit_id>/<assignment_id>/<mode>", methods=["POST"])
@db.serialized
def create_participant(worker_id, hit_id, assignment_id, mode, entry_information=None):
"""Create a participant.
This route is hit early on. Any nodes the participant creates will be
defined in reference to the participant object. You must specify the
worker_id, hit_id, assignment_id, and mode in the url.
"""
config = get_config()
if not config.ready:
config.load()
if config.get("lock_table_when_creating_participant"):
# Historically we have locked the participant table when creating participants
# to avoid database inconsistency problems. However some experimenters have experienced
# some deadlocking problems associated with this locking, so we have made
# it an opt-out behavior.
try:
session.connection().execute(
"LOCK TABLE participant IN EXCLUSIVE MODE NOWAIT"
)
except exc.OperationalError as e:
e.orig = TransactionRollbackError()
raise e
missing = [p for p in (worker_id, hit_id, assignment_id) if p == "undefined"]
if missing:
msg = "/participant POST: required values were 'undefined'"
return error_response(error_type=msg, status=403)
fingerprint_hash = request.args.get("fingerprint_hash") or request.form.get(
"fingerprint_hash"
)
fingerprint_found = False
if fingerprint_hash:
try:
fingerprint_found = models.Participant.query.filter_by(
fingerprint_hash=fingerprint_hash
).one_or_none()
except MultipleResultsFound:
fingerprint_found = True
if fingerprint_hash and fingerprint_found:
db.logger.warning("Same browser fingerprint detected.")
if mode == "live":
return error_response(
error_type="/participant POST: Same participant dectected.", status=403
)
already_participated = models.Participant.query.filter_by(
worker_id=worker_id
).one_or_none()
if already_participated:
db.logger.warning("Worker has already participated.")
return error_response(
error_type="/participant POST: worker has already participated.", status=403
)
duplicate = models.Participant.query.filter_by(
assignment_id=assignment_id, status="working"
).one_or_none()
if duplicate:
msg = """
AWS has reused assignment_id while existing participant is
working. Replacing older participant {}.
"""
app.logger.warning(msg.format(duplicate.id))
q.enqueue(worker_function, "AssignmentReassigned", None, duplicate.id)
# Count working or beyond participants.
nonfailed_count = (
models.Participant.query.filter(
(models.Participant.status == "working")
| (models.Participant.status == "overrecruited")
| (models.Participant.status == "submitted")
| (models.Participant.status == "approved")
).count()
+ 1
)
recruiter_name = request.args.get("recruiter")
# Create the new participant.
exp = Experiment(session)
participant_vals = {
"worker_id": worker_id,
"hit_id": hit_id,
"assignment_id": assignment_id,
"mode": mode,
"recruiter_name": recruiter_name,
"fingerprint_hash": fingerprint_hash,
"entry_information": entry_information,
}
try:
participant = exp.create_participant(**participant_vals)
except Exception:
db.logger.exception(
"Error creating particant using these values: {}".format(participant_vals)
)
msg = "/participant POST: an error occurred while registering the participant."
return error_response(error_type=msg, status=400)
session.flush()
overrecruited = exp.is_overrecruited(nonfailed_count)
if overrecruited:
participant.status = "overrecruited"
result = {
"participant": {
**participant.__json__(),
# Add some extra information that is useful for initializing dallinger.identity
"unique_id": participant.unique_id,
"worker_id": participant.worker_id,
}
}
# Queue notification to others in waiting room
if exp.quorum:
quorum = {
"q": exp.quorum,
"n": nonfailed_count,
"overrecruited": participant.status == "overrecruited",
}
db.queue_message(WAITING_ROOM_CHANNEL, dumps(quorum))
result["quorum"] = quorum
# return the data
return success_response(**result)
@app.route("/participant", methods=["POST"])
def post_participant():
config = _config()
entry_information = request.form.to_dict()
if "fingerprint_hash" in entry_information:
del entry_information["fingerprint_hash"]
# Remove the mode from entry_information if provided
mode = entry_information.pop("mode", config.get("mode"))
exp = Experiment(session)
participant_info = exp.normalize_entry_information(entry_information)
return create_participant(mode=mode, **participant_info)
@app.route("/participant/<participant_id>", methods=["GET"])
def get_participant(participant_id):
"""Get the participant with the given id."""
try:
ppt = models.Participant.query.filter_by(id=participant_id).one()
except NoResultFound:
return error_response(
error_type="/participant GET: no participant found", status=403
)
# return the data
return success_response(participant=ppt.__json__())
@app.route("/load-participant", methods=["POST"])
def load_participant():
"""Get the participant with an assignment id provided in the request.
Delegates to :func:`~dallinger.experiments.Experiment.load_participant`.
"""
entry_information = request.form.to_dict()
exp = Experiment(session)
participant_info = exp.normalize_entry_information(entry_information)
assignment_id = participant_info.get("assignment_id")
if assignment_id is None:
return error_response(
error_type="/load-participant POST: no participant found", status=403
)
ppt = exp.load_participant(assignment_id)
if ppt is None:
return error_response(
error_type="/load-participant POST: no participant found", status=403
)
# return the data
return success_response(participant=ppt.__json__())
@app.route("/network/<network_id>", methods=["GET"])
def get_network(network_id):
"""Get the network with the given id."""
try:
net = models.Network.query.filter_by(id=network_id).one()
except NoResultFound:
return error_response(error_type="/network GET: no network found", status=403)
# return the data
return success_response(network=net.__json__())
@app.route("/question/<participant_id>", methods=["POST"])
def create_question(participant_id):
"""Send a POST request to the question table.
Questions store information at the participant level, not the node
level.
You should pass the question (string) number (int) and response
(string) as arguments.
"""
# Get the participant.
try:
ppt = models.Participant.query.filter_by(id=participant_id).one()
except NoResultFound:
return error_response(
error_type="/question POST no participant found", status=403
)
question = request_parameter(parameter="question")
response = request_parameter(parameter="response")
number = request_parameter(parameter="number", parameter_type="int")
for x in [question, response, number]:
if isinstance(x, Response):
return x
# Consult the recruiter regarding whether to accept a questionnaire
# from the participant:
rejection = ppt.recruiter.rejects_questionnaire_from(ppt)
if rejection:
return error_response(
error_type="/question POST, status = {}, reason: {}".format(
ppt.status, rejection
),
participant=ppt,
)
config = get_config()
question_max_length = config.get("question_max_length", 1000)
if len(question) > question_max_length or len(response) > question_max_length:
return error_response(error_type="/question POST length too long", status=400)
try:
# execute the request
models.Question(
participant=ppt, question=question, response=response, number=number
)
session.commit()
except Exception:
return error_response(error_type="/question POST server error", status=403)
# return the data
return success_response()
@app.route("/node/<int:node_id>/neighbors", methods=["GET"])
def node_neighbors(node_id):
"""Send a GET request to the node table.
This calls the neighbours method of the node
making the request and returns a list of descriptions of
the nodes (even if there is only one).
Required arguments: participant_id, node_id
Optional arguments: type, connection
After getting the neighbours it also calls
exp.node_get_request()
"""
exp = Experiment(session)
# get the parameters
node_type = request_parameter(
parameter="node_type", parameter_type="known_class", default=models.Node
)
connection = request_parameter(parameter="connection", default="to")
failed = request_parameter(parameter="failed", parameter_type="bool", optional=True)
for x in [node_type, connection]:
if isinstance(x, Response):
return x
# make sure the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(
error_type="/node/neighbors, node does not exist",
error_text="/node/{0}/neighbors, node {0} does not exist".format(node_id),
)
# get its neighbors
if failed is not None:
# This will always raise because "failed" is not a supported parameter.
# We just want to pass the exception message back in the response:
try:
node.neighbors(type=node_type, direction=connection, failed=failed)
except Exception as e:
return error_response(error_type="node.neighbors", error_text=str(e))
else:
nodes = node.neighbors(type=node_type, direction=connection)
try:
# ping the experiment
exp.node_get_request(node=node, nodes=nodes)
session.commit()
except Exception:
return error_response(error_type="exp.node_get_request")
return success_response(nodes=[n.__json__() for n in nodes])
@app.route("/node/<participant_id>", methods=["POST"])
@db.serialized
def create_node(participant_id):
"""Send a POST request to the node table.
This makes a new node for the participant, it calls:
1. exp.get_network_for_participant
2. exp.create_node
3. exp.add_node_to_network
4. exp.node_post_request
"""
exp = Experiment(session)
# Get the participant.
try:
participant = models.Participant.query.filter_by(id=participant_id).one()
except NoResultFound:
return error_response(error_type="/node POST no participant found", status=403)
# Make sure the participant status is working
if participant.status != "working":
error_type = "/node POST, status = {}".format(participant.status)
return error_response(error_type=error_type, participant=participant)
# execute the request
network = exp.get_network_for_participant(participant=participant)
if network is None:
return Response(dumps({"status": "error"}), status=403)
node = exp.create_node(participant=participant, network=network)
assign_properties(node)
exp.add_node_to_network(node=node, network=network)
# ping the experiment
exp.node_post_request(participant=participant, node=node)
# return the data
return success_response(node=node.__json__())
@app.route("/node/<int:node_id>/vectors", methods=["GET"])
def node_vectors(node_id):
"""Get the vectors of a node.
You must specify the node id in the url.
You can pass direction (incoming/outgoing/all) and failed
(True/False/all).
"""
exp = Experiment(session)
# get the parameters
direction = request_parameter(parameter="direction", default="all")
failed = request_parameter(parameter="failed", parameter_type="bool", default=False)
for x in [direction, failed]:
if isinstance(x, Response):
return x
# execute the request
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/node/vectors, node does not exist")
try:
vectors = node.vectors(direction=direction, failed=failed)
exp.vector_get_request(node=node, vectors=vectors)
session.commit()
except Exception:
return error_response(
error_type="/node/vectors GET server error",
status=403,
participant=node.participant,
)
# return the data
return success_response(vectors=[v.__json__() for v in vectors])
@app.route("/node/<int:node_id>/connect/<int:other_node_id>", methods=["POST"])
def connect(node_id, other_node_id):
"""Connect to another node.
The ids of both nodes must be speficied in the url.
You can also pass direction (to/from/both) as an argument.
"""
exp = Experiment(session)
# get the parameters
direction = request_parameter(parameter="direction", default="to")
if type(direction == Response):
return direction
# check the nodes exist
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/node/connect, node does not exist")
other_node = models.Node.query.get(other_node_id)
if other_node is None:
return error_response(
error_type="/node/connect, other node does not exist",
participant=node.participant,
)
# execute the request
try:
vectors = node.connect(whom=other_node, direction=direction)
for v in vectors:
assign_properties(v)
# ping the experiment
exp.vector_post_request(node=node, vectors=vectors)
session.commit()
except Exception:
return error_response(
error_type="/vector POST server error",
status=403,
participant=node.participant,
)
return success_response(vectors=[v.__json__() for v in vectors])
@app.route("/info/<int:node_id>/<int:info_id>", methods=["GET"])
def get_info(node_id, info_id):
"""Get a specific info.
Both the node and info id must be specified in the url.
"""
exp = Experiment(session)
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/info, node does not exist")
# execute the experiment method:
info = models.Info.query.get(info_id)
if info is None:
return error_response(
error_type="/info GET, info does not exist", participant=node.participant
)
elif info.origin_id != node.id and info.id not in [
t.info_id for t in node.transmissions(direction="incoming", status="received")
]:
return error_response(
error_type="/info GET, forbidden info",
status=403,
participant=node.participant,
)
try:
# ping the experiment
exp.info_get_request(node=node, infos=info)
session.commit()
except Exception:
return error_response(
error_type="/info GET server error",
status=403,
participant=node.participant,
)
# return the data
return success_response(info=info.__json__())
@app.route("/node/<int:node_id>/infos", methods=["GET"])
def node_infos(node_id):
"""Get all the infos of a node.
The node id must be specified in the url.
You can also pass info_type.
"""
exp = Experiment(session)
# get the parameters
info_type = request_parameter(
parameter="info_type", parameter_type="known_class", default=models.Info
)
if isinstance(info_type, Response):
return info_type
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/node/infos, node does not exist")
try:
# execute the request:
infos = node.infos(type=info_type)
# ping the experiment
exp.info_get_request(node=node, infos=infos)
session.commit()
except Exception:
return error_response(
error_type="/node/infos GET server error",
status=403,
participant=node.participant,
)
return success_response(infos=[i.__json__() for i in infos])
@app.route("/node/<int:node_id>/received_infos", methods=["GET"])
def node_received_infos(node_id):
"""Get all the infos a node has been sent and has received.
You must specify the node id in the url.
You can also pass the info type.
"""
exp = Experiment(session)
# get the parameters
info_type = request_parameter(
parameter="info_type", parameter_type="known_class", default=models.Info
)
if isinstance(info_type, Response):
return info_type
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(
error_type="/node/infos, node {} does not exist".format(node_id)
)
# execute the request:
infos = node.received_infos(type=info_type)
try:
# ping the experiment
exp.info_get_request(node=node, infos=infos)
session.commit()
except Exception:
return error_response(
error_type="info_get_request error",
status=403,
participant=node.participant,
)
return success_response(infos=[i.__json__() for i in infos])
@app.route("/tracking_event/<int:node_id>", methods=["POST"])
@crossdomain(origin="*")
def tracking_event_post(node_id):
"""Enqueue a TrackingEvent worker for the specified Node."""
details = request_parameter(parameter="details", optional=True)
if details:
details = loads(details)
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/info POST, node does not exist")
db.logger.debug(
"rq: Queueing %s with for node: %s for worker_function",
"TrackingEvent",
node_id,
)
q.enqueue(
worker_function, "TrackingEvent", None, None, node_id=node_id, details=details
)
return success_response(details=details)
@app.route("/info/<int:node_id>", methods=["POST"])
@crossdomain(origin="*")
def info_post(node_id):
"""Create an info.
The node id must be specified in the url.
You must pass contents as an argument.
info_type is an additional optional argument.
If info_type is a custom subclass of Info it must be
added to the known_classes of the experiment class.
"""
# get the parameters and validate them
contents = request_parameter(parameter="contents")
info_type = request_parameter(
parameter="info_type", parameter_type="known_class", default=models.Info
)
failed = request_parameter(parameter="failed", parameter_type="bool", default=False)
for x in [contents, info_type]:
if isinstance(x, Response):
return x
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/info POST, node does not exist")
exp = Experiment(session)
try:
# execute the request
additional_params = {}
if failed:
additional_params["failed"] = failed
info = info_type(origin=node, contents=contents, **additional_params)
assign_properties(info)
# ping the experiment
exp.info_post_request(node=node, info=info)
session.commit()
except Exception:
return error_response(
error_type="/info POST server error",
status=403,
participant=node.participant,
)
# return the data
return success_response(info=info.__json__())
@app.route("/node/<int:node_id>/transmissions", methods=["GET"])
def node_transmissions(node_id):
"""Get all the transmissions of a node.
The node id must be specified in the url.
You can also pass direction (to/from/all) or status (all/pending/received)
as arguments.
"""
exp = Experiment(session)
# get the parameters
direction = request_parameter(parameter="direction", default="incoming")
status = request_parameter(parameter="status", default="all")
for x in [direction, status]:
if isinstance(x, Response):
return x
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/node/transmissions, node does not exist")
# execute the request
transmissions = node.transmissions(direction=direction, status=status)
try:
if direction in ["incoming", "all"] and status in ["pending", "all"]:
node.receive()
session.commit()
# ping the experiment
exp.transmission_get_request(node=node, transmissions=transmissions)
session.commit()
except Exception:
return error_response(
error_type="/node/transmissions GET server error",
status=403,
participant=node.participant,
)
# return the data
return success_response(transmissions=[t.__json__() for t in transmissions])
@app.route("/node/<int:node_id>/transmit", methods=["POST"])
def node_transmit(node_id):
"""Transmit to another node.
The sender's node id must be specified in the url.
As with node.transmit() the key parameters are what and to_whom. However,
the values these accept are more limited than for the back end due to the
necessity of serialization.
If what and to_whom are not specified they will default to None.
Alternatively you can pass an int (e.g. '5') or a class name (e.g. 'Info' or
'Agent'). Passing an int will get that info/node, passing a class name will
pass the class. Note that if the class you are specifying is a custom class
it will need to be added to the dictionary of known_classes in your
experiment code.
You may also pass the values property1, property2, property3, property4,
property5 and details. If passed this will fill in the relevant values of
the transmissions created with the values you specified.
For example, to transmit all infos of type Meme to the node with id 10:
dallinger.post(
"/node/" + my_node_id + "/transmit",
{what: "Meme",
to_whom: 10}
);
"""
exp = Experiment(session)
what = request_parameter(parameter="what", optional=True)
to_whom = request_parameter(parameter="to_whom", optional=True)
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(error_type="/node/transmit, node does not exist")
# create what
if what is not None:
try:
what = int(what)
what = models.Info.query.get(what)
if what is None:
return error_response(
error_type="/node/transmit POST, info does not exist",
participant=node.participant,
)
except Exception:
try:
what = exp.known_classes[what]
except KeyError:
msg = "/node/transmit POST, {} not in experiment.known_classes"
return error_response(
error_type=msg.format(what), participant=node.participant
)
# create to_whom
if to_whom is not None:
try:
to_whom = int(to_whom)
to_whom = models.Node.query.get(to_whom)
if to_whom is None:
return error_response(
error_type="/node/transmit POST, recipient Node does not exist",
participant=node.participant,
)
except Exception:
try:
to_whom = exp.known_classes[to_whom]
except KeyError:
msg = "/node/transmit POST, {} not in experiment.known_classes"
return error_response(
error_type=msg.format(to_whom), participant=node.participant
)
# execute the request
try:
transmissions = node.transmit(what=what, to_whom=to_whom)
for t in transmissions:
assign_properties(t)
session.commit()
# ping the experiment
exp.transmission_post_request(node=node, transmissions=transmissions)
session.commit()
except Exception:
return error_response(
error_type="/node/transmit POST, server error", participant=node.participant
)
# return the data
return success_response(transmissions=[t.__json__() for t in transmissions])
@app.route("/node/<int:node_id>/transformations", methods=["GET"])
def transformation_get(node_id):
"""Get all the transformations of a node.
The node id must be specified in the url.
You can also pass transformation_type.
"""
exp = Experiment(session)
# get the parameters
transformation_type = request_parameter(
parameter="transformation_type",
parameter_type="known_class",
default=models.Transformation,
)
if isinstance(transformation_type, Response):
return transformation_type
# check the node exists
node = models.Node.query.get(node_id)
if node is None:
return error_response(
error_type="/node/transformations, "
"node {} does not exist".format(node_id)
)
# execute the request
transformations = node.transformations(type=transformation_type)
try:
# ping the experiment
exp.transformation_get_request(node=node, transformations=transformations)
session.commit()
except Exception:
return error_response(
error_type="/node/transformations GET failed", participant=node.participant
)
# return the data
return success_response(transformations=[t.__json__() for t in transformations])
@app.route(
"/transformation/<int:node_id>/<int:info_in_id>/<int:info_out_id>", methods=["POST"]
)
def transformation_post(node_id, info_in_id, info_out_id):
"""Transform an info.
The ids of the node, info in and info out must all be in the url.
You can also pass transformation_type.
"""
exp = Experiment(session)
# Get the parameters.
transformation_type = request_parameter(
parameter="transformation_type",
parameter_type="known_class",
default=models.Transformation,
)
if isinstance(transformation_type, Response):
return transformation_type
# Check that the node etc. exists.
node = models.Node.query.get(node_id)
if node is None:
return error_response(
error_type="/transformation POST, " "node {} does not exist".format(node_id)
)
info_in = models.Info.query.get(info_in_id)
if info_in is None:
return error_response(
error_type="/transformation POST, info_in {} does not exist".format(
info_in_id
),
participant=node.participant,
)
info_out = models.Info.query.get(info_out_id)
if info_out is None:
return error_response(
error_type="/transformation POST, info_out {} does not exist".format(
info_out_id
),
participant=node.participant,
)
try:
# execute the request
transformation = transformation_type(info_in=info_in, info_out=info_out)
assign_properties(transformation)
session.commit()
# ping the experiment
exp.transformation_post_request(node=node, transformation=transformation)
session.commit()
except Exception:
return error_response(
error_type="/transformation POST failed", participant=node.participant
)
# return the data
return success_response(transformation=transformation.__json__())
@app.route("/notifications", methods=["POST", "GET"])
@crossdomain(origin="*")
def api_notifications():
"""Receive MTurk REST notifications."""
event_type = request.values["Event.1.EventType"]
assignment_id = request.values.get("Event.1.AssignmentId")
participant_id = request.values.get("participant_id")
# Add the notification to the queue.
db.logger.debug(
"rq: Queueing %s with id: %s for worker_function", event_type, assignment_id
)
q.enqueue(worker_function, event_type, assignment_id, participant_id)
db.logger.debug("rq: Submitted Queue Length: %d (%s)", len(q), ", ".join(q.job_ids))
return success_response()
def check_for_duplicate_assignments(participant):
"""Check that the assignment_id of the participant is unique.
If it isnt the older participants will be failed.
"""
participants = models.Participant.query.filter_by(
assignment_id=participant.assignment_id
).all()
duplicates = [
p for p in participants if (p.id != participant.id and p.status == "working")
]
for d in duplicates:
q.enqueue(worker_function, "AssignmentAbandoned", None, d.id)
@app.route("/worker_complete", methods=["POST"])
@db.scoped_session_decorator
def worker_complete():
"""Complete worker."""
participant_id = request.values.get("participant_id")
if not participant_id:
return error_response(
error_type="bad request", error_text="participantId parameter is required"
)
try:
_worker_complete(participant_id)
except KeyError:
return error_response(
error_type="ParticipantId not found: {}".format(participant_id)
)
return success_response(status="success")
def _worker_complete(participant_id):
participant = models.Participant.query.get(participant_id)
if participant is None:
raise KeyError()
if participant.end_time is not None: # Provide idempotence
return
participant.end_time = datetime.now()
session.commit()
# Notify experiment that participant has been marked complete. Doing
# this here, rather than in the worker function, means that
# the experiment can request qualification assignment before the
# worker completes the HIT when using a recruiter like MTurk, where
# execution of the `worker_events.AssignmentSubmitted` command is
# deferred until they've submitted the HIT on the MTurk platform.
exp = Experiment(session)
exp.participant_task_completed(participant)
# Does the recruiter want us to execute some command on worker completion?
event_type = participant.recruiter.on_completion_event()
if event_type is None:
return
# Currently we execute this function synchronously, regardless of the
# event type:
worker_function(
event_type=event_type,
assignment_id=participant.assignment_id,
participant_id=participant_id,
)
@app.route("/worker_failed", methods=["GET"])
@db.scoped_session_decorator
def worker_failed():
"""Fail worker. Used by bots only for now."""
participant_id = request.args.get("participant_id")
if not participant_id:
return error_response(
error_type="bad request", error_text="participantId parameter is required"
)
try:
_worker_failed(participant_id)
except KeyError:
return error_response(
error_type="ParticipantId not found: {}".format(participant_id)
)
return success_response(
field="status", data="success", request_type="worker failed"
)
def _worker_failed(participant_id):
participants = models.Participant.query.filter_by(id=participant_id).all()
if not participants:
raise KeyError()
participant = participants[0]
participant.end_time = datetime.now()
session.add(participant)
session.commit()
# TODO: Recruiter.rejected_event/failed_event (replace conditional w/ polymorphism)
if participant.recruiter_id == "bots" or participant.recruiter_id.startswith(
"bots:"
):
worker_function(
assignment_id=participant.assignment_id,
participant_id=participant.id,
event_type="BotAssignmentRejected",
)
# Insert "mode" into pages so it's carried from page to page done server-side
# to avoid breaking backwards compatibility with old templates.
def insert_mode(page_html, mode):
"""Insert mode."""
match_found = False
matches = re.finditer("workerId={{ workerid }}", page_html)
match = None
for match in matches:
match_found = True
if match_found:
new_html = page_html[: match.end()] + "&mode=" + mode + page_html[match.end() :]
return new_html
else:
raise ExperimentError("insert_mode_failed")