dallinger/recruiters.py
"""Recruiters manage the flow of participants to the experiment."""
from __future__ import unicode_literals
import json
import logging
import os
import random
import re
import string
import time
import flask
import requests
import tabulate
from rq import Queue
from sqlalchemy import func
from dallinger.command_line.utils import Output
from dallinger.config import get_config
from dallinger.db import redis_conn, session
from dallinger.experiment_server.utils import crossdomain, success_response
from dallinger.experiment_server.worker_events import worker_function
from dallinger.heroku import tools as heroku_tools
from dallinger.models import Recruitment
from dallinger.mturk import (
DuplicateQualificationNameError,
MTurkQualificationRequirements,
MTurkQuestions,
MTurkService,
MTurkServiceException,
QualificationNotFoundException,
)
from dallinger.notifications import MessengerError, admin_notifier, get_mailer
from dallinger.prolific import ProlificService, ProlificServiceException
from dallinger.utils import ParticipationTime, generate_random_id, get_base_url
from dallinger.version import __version__
logger = logging.getLogger(__file__)
def _get_queue(name="default"):
# Connect to Redis Queue
return Queue(name, connection=redis_conn)
# These are constants because other components may listen for these
# messages in logs:
NEW_RECRUIT_LOG_PREFIX = "New participant requested:"
CLOSE_RECRUITMENT_LOG_PREFIX = "Close recruitment."
class Recruiter(object):
"""The base recruiter."""
nickname = None
external_submission_url = None # MTurkRecruiter, for one, overides this
def __init__(self):
"""For now, the contract of a Recruiter is that it takes no
arguments.
"""
logger.info("Initializing {}...".format(self.__class__.__name__))
def __call__(self):
"""For backward compatibility with experiments invoking recruiter()
as a method rather than a property.
"""
return self
def open_recruitment(self, n=1):
"""Return a list of one or more initial recruitment URLs and an initial
recruitment message:
{
items: [
'https://experiment-url-1',
'https://experiment-url-2'
],
message: 'More info about this particular recruiter's process'
}
"""
raise NotImplementedError
def normalize_entry_information(self, entry_information):
"""Accepts data from recruited user and returns data needed to validate,
create or load a Dallinger Participant.
See :func:`~dallinger.experiment.Experiment.create_participant` for
details.
The default implementation extracts ``hit_id``, ``assignment_id``, and
``worker_id`` values directly from the ``entry_information``.
Returning a dictionary without valid ``hit_id``, ``assignment_id``, or
``worker_id`` will generally result in an exception.
"""
participant_data = {
"hit_id": entry_information.pop(
"hitId", entry_information.pop("hit_id", None)
),
"assignment_id": entry_information.pop(
"assignmentId", entry_information.pop("assignment_id", None)
),
"worker_id": entry_information.pop(
"workerId", entry_information.pop("worker_id", None)
),
}
if entry_information:
participant_data["entry_information"] = entry_information
return participant_data
def recruit(self, n=1):
raise NotImplementedError
def close_recruitment(self):
"""Throw an error."""
raise NotImplementedError
def assign_experiment_qualifications(self, worker_id, qualifications):
"""Assigns recruiter-specific qualifications to a worker, if supported."""
pass
def compensate_worker(self, *args, **kwargs):
"""A recruiter may provide a means to directly compensate a worker."""
raise NotImplementedError
def exit_response(self, experiment, participant):
"""The recruiter returns an appropriate page on experiment/questionnaire
submission.
"""
raise NotImplementedError
def reward_bonus(self, participant, amount, reason):
"""Throw an error."""
raise NotImplementedError
def notify_duration_exceeded(self, participants, reference_time):
"""Some participants have been working beyond the defined duration of
the experiment.
"""
logger.warning(
"Received notification that some participants "
"have been active for too long. No action taken."
)
def rejects_questionnaire_from(self, participant):
"""Recruiters have different circumstances under which experiment
questionnaires should be accepted or rejected.
To reject a questionnaire, this method returns an error string.
By default, they are accepted, so we return None.
"""
return None
def on_completion_event(self):
"""Return the name of the appropriate WorkerEvent command to run
when a participant completes an experiment.
If no event should be processed, return None.
"""
return "AssignmentSubmitted"
def load_service(self, sandbox):
"""Load the appropriate service for this recruiter."""
raise NotImplementedError
def _get_hits_from_app(self, service, app):
"""Return a list of hits for the given app."""
raise NotImplementedError
def _current_hits(self, service, app):
if app is not None:
return self._get_hits_from_app(service, app)
else:
return service.get_hits()
def hits(self, app=None, sandbox=False):
"""Lists all hits on a recruiter."""
service = self.load_service(sandbox)
hits = self._current_hits(service, app)
formatted_hit_list = []
def _format_date_if_present(date):
dateformat = "%Y/%-m/%-d %I:%M:%S %p"
try:
return date.strftime(dateformat)
except AttributeError:
return ""
for h in hits:
title = h["title"][:40] + "..." if len(h["title"]) > 40 else h["title"]
description = (
h["description"][:60] + "..."
if len(h["description"]) > 60
else h["description"]
)
formatted_hit_list.append(
[
h["id"],
title,
h["annotation"],
h["status"],
_format_date_if_present(h["created"]),
_format_date_if_present(h["expiration"]),
description,
]
)
out = Output()
out.log("Found {} hit[s]:".format(len(formatted_hit_list)))
out.log(
tabulate.tabulate(
formatted_hit_list,
headers=[
"Hit ID",
"Title",
"Annotation (experiment ID)",
"Status",
"Created",
"Expiration",
"Description",
],
),
chevrons=False,
)
def clean_qualification_attributes(self, experiment_details):
"""Remove any attributes that are not required for the qualification."""
return experiment_details
def hit_details(self, hit_id, sandbox=False):
"""Returns details of a hit/hits with the same app name."""
service = self.load_service(sandbox)
details = service.get_study(hit_id)
return self.clean_qualification_attributes(details)
@property
def default_qualification_name(self):
"""Name of the qualification file containing rules to filter participants."""
raise NotImplementedError
def get_qualifications(self, hit_id, sandbox):
"""Return the JSON file containing rules to filter participants."""
raise NotImplementedError
def get_status(self):
"""Return the status of the recruiter as a dictionary."""
return {}
def alphanumeric_code(seed: str, length: int = 8):
"""Return an alphanumeric string of specified length based on a
seed value, so the same result will always be returned for a given
seed.
"""
chooser = random.Random(seed)
alphabet = string.ascii_uppercase + string.digits
return "".join(chooser.choice(alphabet) for i in range(length))
class ProlificRecruiterException(Exception):
"""Custom exception for ProlificRecruiter"""
prolific_routes = flask.Blueprint("prolific_recruiter", __name__)
@prolific_routes.route("/prolific-submission-listener", methods=["POST"])
@crossdomain(origin="*")
def prolific_submission_listener():
"""Called from a JavaScript event handler on the Prolific exit page
(exit_recruiter_prolific.html).
When the participant submits their assignment/study to Prolific,
we are then ready to handle experiment completion task (approval, bonus)
via the `AssignmentSubmitted` async worker function.
"""
identity_info = flask.request.form.to_dict()
logger.warning(
"prolific_submission_listener called: {}".format(json.dumps(identity_info))
)
assignment_id = identity_info.get("assignmentId")
participant_id = identity_info.get("participantId")
recruiter = ProlificRecruiter()
recruiter._handle_exit_form_submission(
assignment_id=assignment_id, participant_id=participant_id
)
return success_response()
# We provide these values in our /ad URL, and Prolific will replace the tokens
# with the right values when they redirect participants to us
PROLIFIC_AD_QUERYSTRING = "&PROLIFIC_PID={{%PROLIFIC_PID%}}&STUDY_ID={{%STUDY_ID%}}&SESSION_ID={{%SESSION_ID%}}"
def _prolific_service_from_config():
config = get_config()
config.load()
return ProlificService(
api_token=config.get("prolific_api_token"),
api_version=config.get("prolific_api_version"),
referer_header=f"https://github.com/Dallinger/Dallinger/v{__version__}",
)
class ProlificRecruiter(Recruiter):
"""A recruiter for [Prolific](https://app.prolific.co/)"""
nickname = "prolific"
def __init__(self, *args, **kwargs):
super().__init__()
self.config = get_config()
if not self.config.ready:
self.config.load()
base_url = get_base_url()
self.ad_url = f"{base_url}/ad?recruiter={self.nickname}"
self.study_domain = os.getenv("HOST")
self.prolificservice = _prolific_service_from_config()
self.notifies_admin = admin_notifier(self.config)
self.mailer = get_mailer(self.config)
self.store = kwargs.get("store") or RedisStore()
@property
def completion_code(self):
return alphanumeric_code(self.config.get("id"))
def open_recruitment(self, n: int = 1) -> dict:
"""Create a Study on Prolific."""
logger.info(f"Opening Prolific recruitment for {n} participants")
if self.is_in_progress:
raise ProlificRecruiterException(
"Tried to open_recruitment(), but a Prolific Study "
f"(ID {self.current_study_id}) is already running for this experiment"
)
if self.study_domain is None:
raise ProlificRecruiterException(
"Can't run a Prolific Study from localhost"
)
study_request = {
"completion_code": self.completion_code,
"completion_option": "url",
"description": self.config.get("description"),
# may be overriden in prolific_recruitment_config, but it's required
# so we provide a default of "allow anyone":
"eligibility_requirements": [],
"estimated_completion_time": self.config.get(
"prolific_estimated_completion_minutes"
),
"external_study_url": self.ad_url + PROLIFIC_AD_QUERYSTRING,
"internal_name": self.config.get("id"),
"maximum_allowed_time": self.config.get(
"prolific_maximum_allowed_minutes",
3 * self.config.get("prolific_estimated_completion_minutes") + 2,
),
"name": self.config.get("title"),
"prolific_id_option": "url_parameters",
"reward": int(self.config.get("base_payment") * 100),
"total_available_places": n,
"mode": self.config.get("mode"),
}
# Merge in any explicit configuration untouched:
if self.config.get("prolific_recruitment_config", None) is not None:
explicit_config = json.loads(self.config.get("prolific_recruitment_config"))
study_request.update(explicit_config)
study_info = self.prolificservice.published_study(**study_request)
self._record_current_study_id(study_info["id"])
return {
"items": [study_info["external_study_url"]],
"message": "Study created on Prolific",
}
def normalize_entry_information(self, entry_information: dict):
"""Map Prolific Study URL params to our internal keys."""
participant_data = {
"hit_id": entry_information["STUDY_ID"],
"worker_id": entry_information["PROLIFIC_PID"],
"assignment_id": entry_information["SESSION_ID"],
"entry_information": entry_information,
}
return participant_data
def recruit(self, n: int = 1):
"""Recruit `n` new participants to an existing Prolific Study"""
if not self.config.get("auto_recruit"):
logger.info("auto_recruit is False: recruitment suppressed")
return
return self.prolificservice.add_participants_to_study(
study_id=self.current_study_id, number_to_add=n
)
def approve_hit(self, assignment_id: str):
"""Approve a participant's assignment/submission on Prolific"""
try:
return self.prolificservice.approve_participant_session(
session_id=assignment_id
)
except ProlificServiceException as ex:
logger.exception(str(ex))
def close_recruitment(self):
"""Do nothing.
In part to be consistent with the MTurkRecruiter, which cannot expire
HITs for technical reasons (see that class's docstring for more details),
we do not automatically end a Prolific Study. This must be done by the
researcher through the Prolific UI.
"""
logger.info(CLOSE_RECRUITMENT_LOG_PREFIX + self.nickname)
@property
def external_submission_url(self):
"""On experiment completion, participants are returned to
the Prolific site with a HIT (Study) specific link, which will
trigger payment of their base pay.
"""
return f"https://app.prolific.co/submissions/complete?cc={self.completion_code}"
def exit_response(self, experiment, participant):
"""Return our custom particpant exit template.
This includes the button which will:
1. call our custom exit handler (/prolific-submission-listener)
2. return the participant to Prolific to submit their assignment
"""
return flask.render_template(
"exit_recruiter_prolific.html",
assignment_id=participant.assignment_id,
participant_id=participant.id,
external_submit_url=self.external_submission_url,
)
def reward_bonus(self, participant, amount, reason):
"""Reward the Prolific worker for a specified assignment with a bonus."""
try:
return self.prolificservice.pay_session_bonus(
study_id=self.current_study_id,
worker_id=participant.worker_id,
amount=amount,
)
except ProlificServiceException as ex:
logger.exception(str(ex))
def on_completion_event(self):
"""We cannot perform post-submission actions (approval, bonus payment)
until after the participant has submitted their study via the Prolific
UI, which we redirect them to from the exit page. This means that we
can't do anything when the questionnaire is submitted, so we return None
to signal this.
"""
return None
@property
def current_study_id(self):
"""Return the ID of the Study associated with the active experiment ID
if any such Study exists.
"""
return self.store.get(self.study_id_storage_key)
@property
def is_in_progress(self):
"""Does an Study for the current experiment ID already exist?"""
return self.current_study_id is not None
@property
def study_id_storage_key(self):
experiment_id = self.config.get("id")
return "{}:{}".format(self.__class__.__name__, experiment_id)
def _record_current_study_id(self, study_id):
self.store.set(self.study_id_storage_key, study_id)
def _handle_exit_form_submission(self, assignment_id: str, participant_id: str):
q = _get_queue()
q.enqueue(worker_function, "AssignmentSubmitted", assignment_id, participant_id)
def load_service(self, sandbox):
return _prolific_service_from_config()
def _get_hits_from_app(self, service, app):
return service.get_hits(hit_filter=lambda h: h.get("annotation") == app)
def clean_qualification_query(self, requirement):
"""Prolific's API returns queries with a lot of unnecessary information:
{
"query": {
"id": "54bef0fafdf99b15608c504e",
"question": "In what country do you currently reside?",
"description": "",
"title": "Current Country of Residence",
"help_text": "Please note that Prolific is currently only available for participants who live in OECD countries. <a href='https://researcher-help.prolific.co/hc/en-gb/articles/360009220833-Who-are-the-people-in-your-participant-pool' target='_blank'>Read more about this</a>",
"participant_help_text": "",
"researcher_help_text": "",
"is_new": false,
"tags": [
"rep_sample_country",
"core-7",
"default_export_country_of_residence"
]
}
However, to identify the qualification, we only need the ID. For readability, we add the title as well.
"""
try:
query_id = requirement["query"]["id"]
title = requirement["query"]["title"]
except KeyError:
query_id = None
title = None
return {"id": query_id, "title": title}
def clean_qualification_requirement(self, requirement):
attributes = requirement["attributes"]
cleaned_attributes = [
attribute
for attribute in attributes
# Skip attribute if
if not (
(
# It is a not selected option
attribute["value"] is False
or attribute["value"] is None
or attribute["value"] == []
)
or (
# It is an input field with the default value
requirement["type"] == "input"
and attribute["value"] == attribute["default_value"]
)
)
]
if requirement["type"] == "range":
if len(attributes) == 0:
return None
if (
attributes[0]["min"] == attributes[0]["value"]
and attributes[1]["max"] == attributes[1]["value"]
):
return None
if len(cleaned_attributes) > 0:
return {
"type": requirement["type"],
"attributes": cleaned_attributes,
"query": self.clean_qualification_query(requirement),
"_cls": requirement["_cls"],
}
else:
return None
def clean_qualification_attributes(self, experiment_details):
"""In Prolific, each selection query lists all possible options even if they are not selected. This obfuscates
which options *are* selected. The API does not need unselected options, so we'll remove it here.
"""
cleaned_requirements = [
self.clean_qualification_requirement(requirement)
for requirement in experiment_details["eligibility_requirements"]
]
cleaned_requirements = [
requirement
for requirement in cleaned_requirements
if requirement is not None
]
experiment_details["eligibility_requirements"] = cleaned_requirements
return experiment_details
@property
def default_qualification_name(self):
return "prolific_config.json"
def get_qualifications(self, hit_id, sandbox):
details = self.hit_details(hit_id, sandbox)
return {
"device_compatibility": details["device_compatibility"],
"eligibility_requirements": details["eligibility_requirements"],
"peripheral_requirements": details["peripheral_requirements"],
}
class CLIRecruiter(Recruiter):
"""A recruiter which prints out /ad URLs to the console for direct
assigment.
"""
nickname = "cli"
def __init__(self):
super(CLIRecruiter, self).__init__()
self.config = get_config()
def exit_response(self, experiment, participant):
"""Delegate to the experiment for possible values to show to the
participant.
"""
exit_info = sorted(experiment.exit_info_for(participant).items())
return flask.render_template(
"exit_recruiter.html",
recruiter=self.__class__.__name__,
participant_exit_info=exit_info,
)
def open_recruitment(self, n=1):
"""Return initial experiment URL list, plus instructions
for finding subsequent recruitment events in experiment logs.
"""
logger.info("Opening CLI recruitment for {} participants".format(n))
recruitments = self.recruit(n)
message = (
"\nSingle recruitment link: {}/ad?recruiter={}&generate_tokens=1&mode={}\n\n"
'Search for "{}" in the logs for subsequent recruitment URLs.\n'
"Open the logs for this experiment with "
'"dallinger logs --app {}"'.format(
get_base_url(),
self.nickname,
self._get_mode(),
NEW_RECRUIT_LOG_PREFIX,
self.config.get("id"),
)
)
return {"items": recruitments, "message": message}
def recruit(self, n=1):
"""Generate experiment URLs and print them to the console."""
logger.info("Recruiting {} CLI participants".format(n))
urls = []
template = "{}/ad?recruiter={}&assignmentId={}&hitId={}&workerId={}&mode={}"
for i in range(n):
ad_url = template.format(
get_base_url(),
self.nickname,
generate_random_id(),
generate_random_id(),
generate_random_id(),
self._get_mode(),
)
logger.info("{} {}".format(NEW_RECRUIT_LOG_PREFIX, ad_url))
urls.append(ad_url)
return urls
def close_recruitment(self):
"""Talk about closing recruitment."""
logger.info(CLOSE_RECRUITMENT_LOG_PREFIX + " cli")
def approve_hit(self, assignment_id):
"""Approve the HIT."""
logger.info("Assignment {} has been marked for approval".format(assignment_id))
return True
def assign_experiment_qualifications(self, worker_id, qualifications):
"""Assigns recruiter-specific qualifications to a worker."""
logger.info(
"Worker ID {} earned these qualifications: {}".format(
worker_id, qualifications
)
)
def reward_bonus(self, participant, amount, reason):
"""Print out bonus info for the assignment"""
logger.info(
'Award ${} for assignment {}, with reason "{}"'.format(
amount, participant.assignment_id, reason
)
)
def _get_mode(self):
return self.config.get("mode")
class HotAirRecruiter(CLIRecruiter):
"""A dummy recruiter: talks the talk, but does not walk the walk.
- Always invokes templates in debug mode
- Prints experiment /ad URLs to the console
"""
nickname = "hotair"
def open_recruitment(self, n=1):
"""Return initial experiment URL list, plus instructions
for finding subsequent recruitment events in experiment logs.
"""
logger.info("Opening HotAir recruitment for {} participants".format(n))
recruitments = self.recruit(n)
message = (
"\nSingle recruitment link: {}/ad?recruiter={}&generate_tokens=1&mode={}\n\n"
"Recruitment requests will open browser windows automatically.".format(
get_base_url(), self.nickname, self._get_mode()
)
)
return {"items": recruitments, "message": message}
def reward_bonus(self, participant, amount, reason):
"""Logging-only, Hot Air implementation"""
logger.info(
"Were this a real Recruiter, we'd be awarding ${} for assignment {}, "
'with reason "{}"'.format(amount, participant.assignment_id, reason)
)
def _get_mode(self):
# Ignore config settings and always use debug mode
return "debug"
class SimulatedRecruiter(Recruiter):
"""A recruiter that recruits simulated participants."""
nickname = "sim"
def open_recruitment(self, n=1):
"""Open recruitment."""
logger.info("Opening Sim recruitment for {} participants".format(n))
return {"items": self.recruit(n), "message": "Simulated recruitment only"}
def recruit(self, n=1):
"""Recruit n participants."""
logger.info("Recruiting {} Sim participants".format(n))
return []
def close_recruitment(self):
"""Do nothing."""
pass
mturk_resubmit_whimsical = """Dearest Friend,
I am writing to let you know that at {s.when}, during my regular (and thoroughly
enjoyable) perousal of the most charming participant data table, I happened to
notice that assignment {s.assignment_id} has been taking longer than we were
expecting. I recall you had suggested {s.allowed_minutes:.0f} minutes as an upper
limit for what was an acceptable length of time for each assignement, however
this assignment had been underway for a shocking {s.active_minutes:.0f} minutes, a
full {s.excess_minutes:.0f} minutes over your allowance. I immediately dispatched a
telegram to our mutual friends at AWS and they were able to assure me that
although the notification had failed to be correctly processed, the assignment
had in fact been completed. Rather than trouble you, I dealt with this myself
and I can assure you there is no immediate cause for concern. Nonetheless, for
my own peace of mind, I would appreciate you taking the time to look into this
matter at your earliest convenience.
I remain your faithful and obedient servant,
William H. Dallinger
P.S. Please do not respond to this message, I am busy with other matters.
"""
mturk_resubmit = """Dear experimenter,
This is an automated email from Dallinger. You are receiving this email because
the Dallinger platform has discovered evidence that a notification from Amazon
Web Services failed to arrive at the server. Dallinger has automatically
contacted AWS and has determined the dropped notification was a submitted
notification (i.e. the participant has finished the experiment). This is a non-
fatal error and so Dallinger has auto-corrected the problem. Nonetheless you may
wish to check the database.
Best,
The Dallinger dev. team.
Error details:
Assignment: {s.assignment_id}
Allowed time: {s.allowed_minutes:.0f} minute(s)
Time since participant started: {s.active_minutes:.0f}
"""
mturk_cancelled_hit_whimsical = """Dearest Friend,
I am afraid I write to you with most grave tidings. At {s.when}, during a
routine check of the usually most delightful participant data table, I happened
to notice that assignment {s.assignment_id} has been taking longer than we were
expecting. I recall you had suggested {s.allowed_minutes:.0f} minutes as an upper
limit for what was an acceptable length of time for each assignment, however
this assignment had been underway for a shocking {s.active_minutes:.0f} minutes, a
full {s.excess_minutes:.0f} minutes over your allowance. I immediately dispatched a
telegram to our mutual friends at AWS and they infact informed me that they had
already sent us a notification which we must have failed to process, implying
that the assignment had not been successfully completed. Of course when the
seriousness of this scenario dawned on me I had to depend on my trusting walking
stick for support: without the notification I didn't know to remove the old
assignment's data from the tables and AWS will have already sent their
replacement, meaning that the tables may already be in a most unsound state!
I am sorry to trouble you with this, however, I do not know how to proceed so
rather than trying to remedy the scenario myself, I have instead temporarily
ceased operations by expiring the HIT with the fellows at AWS and have
refrained from posting any further invitations myself. Once you see fit I
would be most appreciative if you could attend to this issue with the caution,
sensitivity and intelligence for which I know you so well.
I remain your faithful and
obedient servant,
William H. Dallinger
P.S. Please do not respond to this
message, I am busy with other matters.
"""
cancelled_hit = """Dear experimenter,
This is an automated email from Dallinger. You are receiving this email because
the Dallinger platform has discovered evidence that a notification from Amazon
Web Services failed to arrive at the server. Dallinger has automatically
contacted AWS and has determined the dropped notification was an
abandoned/returned notification (i.e. the participant had returned the
experiment or had run out of time). This is a serious error and so Dallinger has
paused the experiment - expiring the HIT on MTurk and setting auto_recruit to
false. Participants currently playing will be able to finish, however no further
participants will be recruited until you do so manually. We strongly suggest you
use the details below to check the database to make sure the missing
notification has not caused additional problems before resuming. If you are
receiving a lot of these emails this suggests something is wrong with your
experiment code.
Best,
The Dallinger dev. team.
Error details:
Assignment: {s.assignment_id}
Allowed time (minutes): {s.allowed_minutes:.0f}
Time since participant started: {s.active_minutes:.0f}
"""
class MTurkHITMessages(object):
@staticmethod
def by_flavor(summary, whimsical):
if whimsical:
return WhimsicalMTurkHITMessages(summary)
return MTurkHITMessages(summary)
_templates = {
"resubmitted": {
"subject": "Dallinger automated email - minor error.",
"template": mturk_resubmit,
},
"cancelled": {
"subject": "Dallinger automated email - major error.",
"template": cancelled_hit,
},
}
def __init__(self, summary):
self.summary = summary
def resubmitted_msg(self):
return self._build("resubmitted")
def hit_cancelled_msg(self):
return self._build("cancelled")
def _build(self, category):
data = self._templates[category]
return {
"body": data["template"].format(s=self.summary),
"subject": data["subject"],
}
class WhimsicalMTurkHITMessages(MTurkHITMessages):
_templates = {
"resubmitted": {
"subject": "A matter of minor concern.",
"template": mturk_resubmit_whimsical,
},
"cancelled": {
"subject": "Most troubling news.",
"template": mturk_cancelled_hit_whimsical,
},
}
class MTurkRecruiterException(Exception):
"""Custom exception for MTurkRecruiter"""
mturk_routes = flask.Blueprint("mturk_recruiter", __name__)
@mturk_routes.route("/mturk-sns-listener", methods=["POST", "GET"])
@crossdomain(origin="*")
def mturk_recruiter_notify():
"""Listens for:
1. AWS SNS subscription confirmation request
2. SNS subcription messages, which forward MTurk notifications
"""
recruiter = MTurkRecruiter()
logger.warning("Raw notification body: {}".format(flask.request.get_data()))
content = json.loads(flask.request.get_data())
message_type = content.get("Type")
# 1. SNS subscription confirmation request
if message_type == "SubscriptionConfirmation":
logger.warning("Received a SubscriptionConfirmation message from AWS.")
token = content.get("Token")
topic = content.get("TopicArn")
recruiter._confirm_sns_subscription(token=token, topic=topic)
# 2. MTurk Worker event
elif message_type == "Notification":
logger.warning("Received an Event Notification from AWS.")
message = json.loads(content.get("Message"))
events = message["Events"]
recruiter._report_event_notification(events)
else:
logger.warning("Unknown SNS notification type: {}".format(message_type))
return success_response()
class RedisStore(object):
"""A wrapper around redis, to handle value decoding on retrieval,
and easy cleanup of all managed keys via a prefix applied to all
stored key/value pairs.
"""
def __init__(self):
self._redis = redis_conn
self._prefix = self.__class__.__name__
def set(self, key, value):
"""Add a prefix to the key, then store the key/value pair in redis."""
self._redis.set(self._prefixed(key), value)
def get(self, key):
"""Retrieve the value from redis and decode it."""
raw = self._redis.get(self._prefixed(key))
if raw is not None:
return raw.decode("utf-8")
def clear(self):
"""Remove any key that starts with our prefix."""
for key in self._redis.keys():
key_decoded = key.decode("utf-8")
if key_decoded.startswith(self._prefix):
self._redis.delete(key)
def _prefixed(self, key):
return "{}:{}".format(self._prefix, key)
def _run_mturk_qualification_assignment(worker_id, qualifications):
"""Provides a way to run qualification assignment asynchronously."""
recruiter = MTurkRecruiter()
recruiter._assign_experiment_qualifications(worker_id, qualifications)
class MTurkRecruiter(Recruiter):
"""Recruit participants from Amazon Mechanical Turk"""
nickname = "mturk"
extra_routes = mturk_routes
def __init__(self, *args, **kwargs):
super().__init__()
self.config = get_config()
if not self.config.ready:
self.config.load()
base_url = get_base_url()
self.ad_url = "{}/ad?recruiter={}".format(base_url, self.nickname)
self.notification_url = "{}/mturk-sns-listener".format(base_url)
self.hit_domain = os.getenv("HOST")
self.mturkservice = MTurkService(
aws_access_key_id=self.config.get("aws_access_key_id"),
aws_secret_access_key=self.config.get("aws_secret_access_key"),
region_name=self.config.get("aws_region"),
sandbox=self.config.get("mode") != "live",
)
self.notifies_admin = admin_notifier(self.config)
self.mailer = get_mailer(self.config)
self.store = kwargs.get("store") or RedisStore()
skip_config_validation = kwargs.get("skip_config_validation", False)
if not skip_config_validation:
self._validate_config()
def _validate_config(self):
mode = self.config.get("mode")
if mode not in ("sandbox", "live"):
raise MTurkRecruiterException(
'"{}" is not a valid mode for MTurk recruitment. '
'The value of "mode" must be either "sandbox" or "live"'.format(mode)
)
def exit_response(self, experiment, participant):
return flask.render_template(
"exit_recruiter_mturk.html",
hitid=participant.hit_id,
assignmentid=participant.assignment_id,
workerid=participant.worker_id,
external_submit_url=self.external_submission_url,
)
@property
def external_submission_url(self):
"""On experiment completion, participants are returned to
the Mechanical Turk site to submit their HIT, which in turn triggers
notifications to the /mturk-sns-listener route.
"""
if self.is_sandbox:
return "https://workersandbox.mturk.com/mturk/externalSubmit"
return "https://www.mturk.com/mturk/externalSubmit"
def open_recruitment(self, n=1):
"""Open a connection to AWS MTurk and create a HIT."""
logger.info("Opening MTurk recruitment for {} participants".format(n))
if self.is_in_progress:
raise MTurkRecruiterException(
"Tried to open_recruitment on already open recruiter."
)
if self.hit_domain is None:
raise MTurkRecruiterException("Can't run a HIT from localhost")
self.mturkservice.check_credentials()
hit_request = {
"experiment_id": self.config.get("id"),
"max_assignments": n,
"title": "{} ({})".format(
self.config.get("title"), heroku_tools.app_name(self.config.get("id"))
),
"description": self.config.get("description"),
"keywords": self._config_to_list("keywords"),
"reward": self.config.get("base_payment"),
"duration_hours": self.config.get("duration"),
"lifetime_days": self.config.get("lifetime"),
"question": MTurkQuestions.external(self.ad_url),
"notification_url": self.notification_url,
"annotation": self.config.get("id"),
"qualifications": self._build_required_hit_qualifications(),
}
hit_info = self.mturkservice.create_hit(**hit_request)
self._record_current_hit_id(hit_info["id"])
url = hit_info["worker_url"]
return {
"items": [url],
"message": "HIT now published to Amazon Mechanical Turk",
}
def assign_experiment_qualifications(self, worker_id, qualifications):
"""Assigns MTurk Qualifications to a worker.
This can be slow, and the call originates with a web request to the
/worker_complete route, which we don't want to time out.
Since we don't need to return a value, we can offload the work to
an async worker.
@param worker_id string the MTurk worker ID
@param qualifications list of dict w/ `name`, `description` and
(optional) `score` keys
"""
q = _get_queue()
q.enqueue(_run_mturk_qualification_assignment, worker_id, qualifications)
def _assign_experiment_qualifications(self, worker_id, qualifications):
# Called from an async worker.
by_name = {qual["name"]: qual for qual in qualifications}
result = self._ensure_mturk_qualifications(qualifications)
for qual in result["new_qualifications"]:
score = by_name[qual["name"]].get("score")
if score is not None:
self.mturkservice.assign_qualification(
qual["id"], worker_id, qual["score"]
)
else:
self.mturkservice.increment_qualification_score(qual["id"], worker_id)
for name in result["existing_qualifications"]:
score = by_name[name].get("score")
if score is not None:
self.mturkservice.assign_named_qualification(name, worker_id, score)
else:
self.mturkservice.increment_named_qualification_score(name, worker_id)
def compensate_worker(self, worker_id, email, dollars, notify=True):
"""Pay a worker by means of a special HIT that only they can see."""
qualification = self.mturkservice.create_qualification_type(
name="Dallinger Compensation Qualification - {}".format(
generate_random_id()
),
description=(
"You have received a qualification to allow you to complete a "
"compensation HIT from Dallinger for ${}.".format(dollars)
),
)
qid = qualification["id"]
self.mturkservice.assign_qualification(qid, worker_id, 1, notify=notify)
hit_request = {
"experiment_id": "(compensation only)",
"max_assignments": 1,
"title": "Dallinger Compensation HIT",
"description": "For compensation only; no task required.",
"keywords": [],
"reward": float(dollars),
"duration_hours": 1,
"lifetime_days": 3,
"question": MTurkQuestions.compensation(sandbox=self.is_sandbox),
"qualifications": [MTurkQualificationRequirements.must_have(qid)],
"do_subscribe": False,
}
hit_info = self.mturkservice.create_hit(**hit_request)
if email is not None:
message = {
"subject": "Dallinger Compensation HIT",
"sender": self.config.get("dallinger_email_address"),
"recipients": [email],
"body": (
"A special compensation HIT is available for you to complete on MTurk.\n\n"
"Title: {title}\n"
"Reward: ${reward:.2f}\n"
"URL: {worker_url}"
).format(**hit_info),
}
self.mailer.send(**message)
else:
message = {}
return {"hit": hit_info, "qualification": qualification, "email": message}
def recruit(self, n=1):
"""Recruit n new participants to an existing HIT"""
logger.info("Recruiting {} MTurk participants".format(n))
if not self.config.get("auto_recruit"):
logger.info("auto_recruit is False: recruitment suppressed")
return
hit_id = self.current_hit_id()
if hit_id is None:
logger.info("no HIT in progress: recruitment aborted")
return
try:
return self.mturkservice.extend_hit(
hit_id, number=n, duration_hours=self.config.get("duration")
)
except MTurkServiceException as ex:
logger.exception(str(ex))
def notify_duration_exceeded(self, participants, reference_time):
"""The participant has exceed the maximum time for the activity,
defined in the "duration" config value. We need find out the assignment
status on MTurk and act based on this.
"""
unsubmitted = []
for participant in participants:
summary = ParticipationTime(participant, reference_time, self.config)
status = self._mturk_status_for(participant)
if status == "Approved":
participant.status = "approved"
session.commit()
elif status == "Rejected":
participant.status = "rejected"
session.commit()
elif status == "Submitted":
self._resend_submitted_rest_notification_for(participant)
self._message_researcher(self._resubmitted_msg(summary))
logger.warning(
"Error - submitted notification for participant {} missed. "
"A replacement notification was created and sent, "
"but proceed with caution.".format(participant.id)
)
else:
self._send_notification_missing_rest_notification_for(participant)
unsubmitted.append(summary)
disable_hit = self.config.get("disable_when_duration_exceeded")
if disable_hit and unsubmitted:
self._disable_autorecruit()
self.close_recruitment()
pick_one = unsubmitted[0]
# message the researcher about the one of the participants:
self._message_researcher(self._cancelled_msg(pick_one))
# Attempt to force-expire the hit via boto. It's possible
# that the HIT won't exist if the HIT has been deleted manually.
try:
self.mturkservice.expire_hit(pick_one.participant.hit_id)
except MTurkServiceException as ex:
logger.exception(ex)
def rejects_questionnaire_from(self, participant):
"""Mechanical Turk participants submit their HITs on the MTurk site
(see external_submission_url), and MTurk then sends a notification
to Dallinger which is used to mark the assignment completed.
If a HIT has already been submitted, it's too late to submit the
questionnaire.
"""
if participant.status != "working":
return (
"This participant has already sumbitted their HIT "
"on MTurk and can no longer submit the questionnaire"
)
def on_completion_event(self):
"""MTurk will send its own notification when the worker
completes the HIT on that service.
"""
return None
def reward_bonus(self, participant, amount, reason):
"""Reward the Turker for a specified assignment with a bonus."""
try:
return self.mturkservice.grant_bonus(
participant.assignment_id, amount, reason
)
except MTurkServiceException as ex:
logger.exception(str(ex))
@property
def is_in_progress(self):
"""Does an MTurk HIT for the current experiment ID already exist?"""
return self.current_hit_id() is not None
def current_hit_id(self):
"""Return the ID of the HIT associated with the active experiment ID
if any such HIT exists.
"""
return self.store.get(self.hit_id_storage_key)
def approve_hit(self, assignment_id):
try:
return self.mturkservice.approve_assignment(assignment_id)
except MTurkServiceException as ex:
logger.exception(str(ex))
def close_recruitment(self):
"""Do nothing.
Notifications of worker HIT submissions on MTurk seem to be
discontinued once a HIT has been expired. This means that we never
recieve notifications about HIT submissions from workers who, for
whatever reason, delay submitting their HIT. Since there are no
pressing issues caused by simply not automating HIT expiration,
this is the solution we've settled on for the past several years.
- `Jesse Snyder <https://github.com/jessesnyder/>__` Feb 1 2022
"""
logger.info(CLOSE_RECRUITMENT_LOG_PREFIX + " mturk")
@property
def is_sandbox(self):
return self.config.get("mode") == "sandbox"
@property
def hit_id_storage_key(self):
experiment_id = self.config.get("id")
return "{}:{}".format(self.__class__.__name__, experiment_id)
def _build_required_hit_qualifications(self):
# The Qualications an MTurk worker must have, or in the case of the
# blocklist, not have, in order for them to see and accept the HIT.
quals = []
reqs = MTurkQualificationRequirements
if self.config.get("approve_requirement") is not None:
quals.append(reqs.min_approval(self.config.get("approve_requirement")))
if self.config.get("us_only"):
quals.append(reqs.restrict_to_countries(["US"]))
for item in self._config_to_list("mturk_qualification_blocklist"):
qtype = self.mturkservice.get_qualification_type_by_name(item)
if qtype:
quals.append(reqs.must_not_have(qtype["id"]))
if self.config.get("mturk_qualification_requirements", None) is not None:
explicit_qualifications = json.loads(
self.config.get("mturk_qualification_requirements")
)
quals.extend(explicit_qualifications)
return quals
def _record_current_hit_id(self, hit_id):
self.store.set(self.hit_id_storage_key, hit_id)
def _confirm_sns_subscription(self, token, topic):
self.mturkservice.confirm_subscription(token=token, topic=topic)
def _report_event_notification(self, events):
q = _get_queue()
for event in events:
event_type = event.get("EventType")
assignment_id = event.get("AssignmentId")
participant_id = None
q.enqueue(worker_function, event_type, assignment_id, participant_id)
def _mturk_status_for(self, participant):
try:
assignment = self.mturkservice.get_assignment(participant.assignment_id)
status = assignment["status"]
except Exception:
status = None
return status
def _disable_autorecruit(self):
heroku_app = heroku_tools.HerokuApp(self.config.get("heroku_app_id_root"))
args = json.dumps({"auto_recruit": "false"})
headers = heroku_tools.request_headers(self.config.get("heroku_auth_token"))
requests.patch(heroku_app.config_url, data=args, headers=headers)
def _resend_submitted_rest_notification_for(self, participant):
q = _get_queue()
q.enqueue(
worker_function, "AssignmentSubmitted", participant.assignment_id, None
)
def _send_notification_missing_rest_notification_for(self, participant):
q = _get_queue()
q.enqueue(
worker_function, "NotificationMissing", participant.assignment_id, None
)
def _config_to_list(self, key):
# At some point we'll support lists, so all service code supports them,
# but the config system only supports strings for now, so we convert:
as_string = self.config.get(key, None)
if as_string is None:
return []
return [item.strip() for item in as_string.split(",") if item.strip()]
def _ensure_mturk_qualifications(self, qualifications):
"""Create MTurk Qualifications for names that don't already exist,
but also return names that already do.
"""
result = {"new_qualifications": [], "existing_qualifications": []}
for qual in qualifications:
name = qual["name"]
desc = qual["description"]
try:
result["new_qualifications"].append(
{
"name": name,
"id": self.mturkservice.create_qualification_type(name, desc)[
"id"
],
"available": False,
}
)
except DuplicateQualificationNameError:
result["existing_qualifications"].append(name)
# We need to make sure the new qualifications are actually ready
# for assignment, as there's a small delay.
for tries in range(5):
for new in result["new_qualifications"]:
if new["available"]:
continue
try:
self.mturkservice.get_qualification_type_by_name(new["name"])
except QualificationNotFoundException:
logger.warn(
"Did not find qualification {}. Trying again...".format(
new["name"]
)
)
time.sleep(1)
else:
new["available"] = True
if all([n["available"] for n in result["new_qualifications"]]):
break
unavailable = [q for q in result["new_qualifications"] if not q["available"]]
if unavailable:
logger.warn(
"After several attempts, some qualifications are still not ready "
"for assignment: {}".format(", ".join(unavailable))
)
# Return just the available among the new ones
result["new_qualifications"] = [
q for q in result["new_qualifications"] if q["available"]
]
return result
def _resubmitted_msg(self, summary):
templates = MTurkHITMessages.by_flavor(summary, self.config.get("whimsical"))
return templates.resubmitted_msg()
def _cancelled_msg(self, summary):
templates = MTurkHITMessages.by_flavor(summary, self.config.get("whimsical"))
return templates.hit_cancelled_msg()
def _message_researcher(self, message):
try:
self.notifies_admin.send(message["subject"], message["body"])
except MessengerError as ex:
logger.exception(ex)
def load_service(self, sandbox):
from dallinger.command_line import _mturk_service_from_config
return _mturk_service_from_config(sandbox)
def _get_hits_from_app(self, service, app):
return service.get_hits(
hit_filter=lambda h: h.get("internal_name", None) == app
)
@property
def default_qualification_name(self):
return "mturk_qualifications.json"
def get_qualifications(self, hit_id, sandbox):
service = self.load_service(sandbox)
return service.get_study(hit_id)["QualificationRequirements"]
class RedisTally(object):
_key = "num_recruited"
def __init__(self):
redis_conn.set(self._key, 0)
def increment(self, count):
redis_conn.incr(self._key, count)
@property
def current(self):
return int(redis_conn.get(self._key))
class MTurkLargeRecruiter(MTurkRecruiter):
nickname = "mturklarge"
pool_size = 10
def __init__(self, *args, **kwargs):
self.counter = kwargs.get("counter") or RedisTally()
super(MTurkLargeRecruiter, self).__init__(*args, **kwargs)
def open_recruitment(self, n=1):
logger.info("Opening MTurkLarge recruitment for {} participants".format(n))
if self.is_in_progress:
raise MTurkRecruiterException(
"Tried to open_recruitment on already open recruiter."
)
self.counter.increment(n)
to_recruit = max(n, self.pool_size)
return super(MTurkLargeRecruiter, self).open_recruitment(to_recruit)
def recruit(self, n=1):
logger.info("Recruiting {} MTurkLarge participants".format(n))
if not self.config.get("auto_recruit"):
logger.info("auto_recruit is False: recruitment suppressed")
return
needed = max(0, n - self.remaining_pool)
self.counter.increment(n)
if needed:
return super(MTurkLargeRecruiter, self).recruit(needed)
@property
def remaining_pool(self):
return max(0, self.pool_size - self.counter.current)
class BotRecruiter(Recruiter):
"""Recruit bot participants using a queue"""
nickname = "bots"
_timeout = "1h"
def __init__(self):
super(BotRecruiter, self).__init__()
self.config = get_config()
def open_recruitment(self, n=1):
"""Start recruiting right away."""
logger.info("Opening Bot recruitment for {} participants".format(n))
factory = self._get_bot_factory()
bot_class_name = factory("", "", "").__class__.__name__
return {
"items": self.recruit(n),
"message": "Bot recruitment started using {}".format(bot_class_name),
}
def recruit(self, n=1):
"""Recruit n new participant bots to the queue"""
logger.info("Recruiting {} Bot participants".format(n))
factory = self._get_bot_factory()
urls = []
q = _get_queue(name="low")
for _ in range(n):
base_url = get_base_url()
worker = generate_random_id()
hit = generate_random_id()
assignment = generate_random_id()
ad_parameters = (
"recruiter={}&assignmentId={}&hitId={}&workerId={}&mode=sandbox"
)
ad_parameters = ad_parameters.format(self.nickname, assignment, hit, worker)
url = "{}/ad?{}".format(base_url, ad_parameters)
urls.append(url)
bot = factory(url, assignment_id=assignment, worker_id=worker, hit_id=hit)
job = q.enqueue(bot.run_experiment, job_timeout=self._timeout)
logger.warning("Created job {} for url {}.".format(job.id, url))
return urls
def approve_hit(self, assignment_id):
return True
def close_recruitment(self):
"""Clean up once the experiment is complete.
This does nothing at this time.
"""
logger.info(CLOSE_RECRUITMENT_LOG_PREFIX + " bot")
def notify_duration_exceeded(self, participants, reference_time):
"""The bot participant has been working longer than the time defined in
the "duration" config value.
"""
for participant in participants:
participant.status = "rejected"
session.commit()
def reward_bonus(self, participant, amount, reason):
"""Logging only. These are bots."""
logger.info("Bots don't get bonuses. Sorry, bots.")
def on_completion_event(self):
return "BotAssignmentSubmitted"
def _get_bot_factory(self):
# Must be imported at run-time
from dallinger_experiment.experiment import Bot
return Bot
class MultiRecruiter(Recruiter):
nickname = "multi"
# recruiter spec e.g. recruiters = bots: 5, mturk: 1
SPEC_RE = re.compile(r"(\w+):\s*(\d+)")
def __init__(self):
super(MultiRecruiter, self).__init__()
self.spec = self.parse_spec()
def parse_spec(self):
"""Parse the specification of how to recruit participants.
Example: recruiters = bots: 5, mturk: 1
"""
recruiters = []
spec = get_config().get("recruiters")
for match in self.SPEC_RE.finditer(spec):
name = match.group(1)
count = int(match.group(2))
recruiters.append((name, count))
return recruiters
def recruiters(self, n=1):
"""Iterator that provides recruiters along with the participant
count to be recruited for up to `n` participants.
We use the `Recruitment` table in the db to keep track of
how many recruitments have been requested using each recruiter.
We'll use the first one from the specification that
hasn't already reached its quota.
"""
recruit_count = 0
while recruit_count <= n:
counts = dict(
session.query(Recruitment.recruiter_id, func.count(Recruitment.id))
.group_by(Recruitment.recruiter_id)
.all()
)
for recruiter_id, target_count in self.spec:
remaining = 0
count = counts.get(recruiter_id, 0)
if count >= target_count:
# This recruiter quota was reached;
# move on to the next one.
counts[recruiter_id] = count - target_count
continue
else:
# Quota is still available; let's use it.
remaining = target_count - count
break
else:
return
num_recruits = min(n - recruit_count, remaining)
# record the recruitments and commit
for i in range(num_recruits):
session.add(Recruitment(recruiter_id=recruiter_id))
session.commit()
recruit_count += num_recruits
yield by_name(recruiter_id), num_recruits
def open_recruitment(self, n=1):
"""Return initial experiment URL list."""
logger.info("Multi recruitment running for {} participants".format(n))
recruitments = []
messages = {}
remaining = n
for recruiter, count in self.recruiters(n):
if not count:
break
if recruiter.nickname in messages:
result = recruiter.recruit(count)
recruitments.extend(result)
else:
result = recruiter.open_recruitment(count)
recruitments.extend(result["items"])
messages[recruiter.nickname] = result["message"]
remaining -= count
if remaining <= 0:
break
logger.info(
(
"Multi-recruited {} out of {} participants, " "using {} recruiters."
).format(n - remaining, n, len(messages))
)
return {"items": recruitments, "message": "\n".join(messages.values())}
def recruit(self, n=1):
"""For multi recruitment recruit and open_recruitment
have the same logic. We may need to open recruitment on any of our
sub-recruiters at any point in recruitment.
"""
return self.open_recruitment(n)["items"]
def close_recruitment(self):
for name in set(name for name, count in self.spec):
recruiter = by_name(name)
recruiter.close_recruitment()
def for_experiment(experiment):
"""Return the Recruiter instance for the specified Experiment.
This provides a seam for testing.
"""
return experiment.recruiter
def from_config(config):
"""Return a Recruiter instance based on the configuration.
Default is HotAirRecruiter in debug mode (unless we're using
the bot recruiter, which can be used in debug mode)
and the MTurkRecruiter in other modes.
"""
debug_mode = config.get("mode") == "debug"
name = config.get("recruiter", None)
recruiter = None
# Special case 1: Don't use a configured recruiter in replay mode
if config.get("replay"):
return HotAirRecruiter()
if name is not None:
recruiter = by_name(name)
# Special case 2: may run BotRecruiter or MultiRecruiter in any mode
# (debug or not), so it trumps everything else:
if isinstance(recruiter, (BotRecruiter, MultiRecruiter)):
return recruiter
# Special case 3: if we're not using bots and we're in debug mode,
# if present, use the configured debug_recruiter or else fallback to HotAirRecruiter:
if debug_mode:
return by_name(config.get("debug_recruiter", "HotAirRecruiter"))
# Configured recruiter:
if recruiter is not None:
return recruiter
if name and recruiter is None:
raise NotImplementedError("No such recruiter {}".format(name))
# Default if we're not in debug mode:
return MTurkRecruiter()
def _descendent_classes(cls):
for cls in cls.__subclasses__():
yield cls
for cls in _descendent_classes(cls):
yield cls
def by_name(name, **kwargs):
"""Attempt to return a recruiter class by name.
Actual class names and known nicknames are both supported.
"""
by_name = {}
for cls in _descendent_classes(Recruiter):
ids = [cls.nickname, cls.__name__]
for id_ in ids:
previous_registered_cls = by_name.get(id_, None)
if previous_registered_cls:
should_overwrite = issubclass(cls, previous_registered_cls)
else:
should_overwrite = True
if should_overwrite:
by_name[id_] = cls
klass = by_name.get(name)
if klass is not None:
return klass(**kwargs)