lib/m_turk_utils.rb
# Copyright 2012 Twitter, Inc. and others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Utility methods for interacting with Mechanical Turk
module MTurkUtils
# Basic structure of the XML structure for an HTML question.
HTML_QUESTION_XML = <<-END_XML
<HTMLQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2011-11-11/HTMLQuestion.xsd">
<HTMLContent><![CDATA[
%<content>s
]]>
</HTMLContent>
<FrameHeight>%<height>s</FrameHeight>
</HTMLQuestion>
END_XML
class << self
# Reloads the configuration from MTURK_CONFIG
def init
init_prod
init_sandbox
end
# Reloads the configuration from config/mturk_prod.yml
def init_prod
@mturk_prod = Amazon::WebServices::MechanicalTurkRequester.new(
:AWSAccessKeyId => MTURK_CONFIG[:access_key_id],
:AWSAccessKey => MTURK_CONFIG[:access_key],
:Host => 'Prod'
)
end
# Reloads the configuration from config/mturk_sandbox.yml
def init_sandbox
@mturk_sandbox = Amazon::WebServices::MechanicalTurkRequester.new(
:AWSAccessKeyId => MTURK_CONFIG[:access_key_id],
:AWSAccessKey => MTURK_CONFIG[:access_key],
:Host => 'Sandbox'
)
end
private
# Calls init_prod and init_sandbox if the configs haven't already been
# loaded
def init_if_needed
init_prod if @mturk_prod.nil?
init_sandbox if @mturk_sandbox.nil?
end
public
# Returns the url of a HIT group, given the associated Evaluation.
def get_hit_url eval
if !eval.prod?
# Sandbox Url
"http://workersandbox.mturk.com/mturk/preview?groupId=#{eval.mturk_hit_type}"
else
# Production Url
"http://mturk.com/mturk/preview?groupId=#{eval.mturk_hit_type}"
end
end
# Returns the URL to manage a given Task.
def get_task_url task
hit_id = task.mturk_hit
if !task.evaluation.prod?
# Sandbox Url
"http://requestersandbox.mturk.com/mturk/manageHIT?HITId=#{hit_id}"
else
# Production Url
"http://requester.mturk.com/mturk/manageHIT?HITId=#{hit_id}"
end
end
# Returns the id of the "trusted workers" qualification. If prod is true
# the id is for the production mturk service, else it is for the
# sandbox mturk service.
def get_trusted_qual_id prod
init_if_needed
if !prod
# Sandbox id
MTURK_CONFIG[:qualifications][:trusted][:sandbox]
else
# Production id
MTURK_CONFIG[:qualifications][:trusted][:prod]
end
end
# Returns the id of the "categorization masters" qualification. If prod is
# true the id is for the production mturk service, else it is for the
# sandbox mturk service.
def get_master_qual_id prod
init_if_needed
if !prod
# Sandbox id
MTURK_CONFIG[:qualifications][:master][:sandbox]
else
# Production id
MTURK_CONFIG[:qualifications][:master][:prod]
end
end
private
# Given an HTML page and a frame height, returns the XML that should be
# sent to mechanical turk for an HTMLQuestion.
def build_question_xml content, frame_height=400
HTML_QUESTION_XML % {:content => content, :height => frame_height}
end
public
# Submits a Task to mechanical turk. Registers the task's Evaluation as
# a hit type if needed. Updates the task's mturk_hit property upon
# successful completion.
def submit_task task
eval = task.evaluation
if eval.mturk_hit_type.blank?
register_hit_type eval
end
props = {
:HITTypeId => eval.mturk_hit_type,
:Question => build_question_xml(task.render),
:LifetimeInSeconds => eval.lifetime,
:MaxAssignments => 1,
:UniqueRequestToken => task.uuid
}
result = mturk_run do
mturk(task.evaluation).createHIT(props)
end
if result
task.mturk_hit = result[:HITId]
task.save!
end
end
# Registers an Evaluation as a HIT type on mechanical turk. Updates the
# evaluation's mturk_hit_type property upon successful completion.
def register_hit_type eval
props = {
:Title => eval.title,
:Description => eval.desc + " (CR ID: #{eval.id})",
:Reward => { :Amount => (eval.payment / 100.0), :CurrencyCode => 'USD' },
:AssignmentDurationInSeconds => eval.duration,
:Keywords => eval.keywords,
:AutoApprovalDelayInSeconds => eval.auto_approve
}
if eval.mturk_qualification_type != nil
props[:QualificationRequirement] = [{
:QualificationTypeId => eval.mturk_qualification_type,
:Comparator => 'Exists'
}]
end
result = mturk(eval).registerHITType(props)
eval.mturk_hit_type = result[:HITTypeId]
eval.save!
end
# Returns the number of results submitted by mturk judges for a particular
# Evaluation.
def num_results eval
mturk(eval).getReviewableHITs(:HITTypeId => eval.mturk_hit_type)[:TotalNumResults]
end
# Imports responses to the given Task as a TaskResult.
def fetch_results task
hit_id = task.mturk_hit
return unless hit_id
assignments = mturk_run{mturk(task.evaluation).getAssignmentsForHIT :HITId => hit_id}
# return if we don't have a response from a MTurk user
if assignments.nil? or assignments[:Assignment].nil?
Rails.logger.warn("[fetch_results] No responses for task #{task.id}")
return
end
answers = Hash.from_xml(assignments[:Assignment][:Answer])["QuestionFormAnswers"]["Answer"]
unless answers.kind_of? Array
# If there's only one question, we get a single hash rather than a bunch
# of hashes wrapped in an array. We wrap this hash in an array for
# consistency.
answers = [answers]
end
worker_id = assignments[:Assignment][:WorkerId]
time = assignments[:Assignment][:SubmitTime] - assignments[:Assignment][:AcceptTime]
response = task.build_task_response
response.m_turk_user = MTurkUser.find_or_create_by_id_and_prod worker_id, task.evaluation.prod
response.work_duration = time
# import answer data
answers.each do |answer|
# question identifiers are in the format "type:id" where type is
# "fr" or "mc" and id is the id of the FRQuestion or MCQuestion.
# If no response was submitted, we get a result in the form
# ["QuestionIdentifier", "mc:177"], which we need to special-case
if answer.kind_of? Array
Rails.logger.warn("[fetch_results] Got no response for MCQuestion #{answer.last} in task #{task.id}")
next
end
question_type, question_id = answer["QuestionIdentifier"].split(':')
answer_content = answer["FreeText"]
if question_type == "fr"
# free-response question
begin
question = FRQuestion.find question_id.to_i
rescue ActiveRecord::RecordNotFound
Rails.logger.warn("[fetch_results] Could not find FRQuestion #{question_id}")
next
end
question_response = response.fr_question_responses.build
if answer_content.blank?
answer_content = "No reponse given"
end
question_response.response = answer_content
question_response.fr_question = question
elsif question_type == "mc"
# multiple-choice question
begin
option = MCQuestionOption.find answer_content.to_i
rescue ActiveRecord::RecordNotFound
Rails.logger.warn("[fetch_results] Could not find MCQuestion #{answer_content}")
next
end
next if option.nil?
question_response = response.mc_question_responses.build
question_response.mc_question_option = option
end
end
response.save!
end
# Expires a task, so mturk works can no longer complete it
def force_expire task
hit_id = task.mturk_hit
return unless hit_id
begin
mturk_run{mturk(task.evaluation).forceExpireHIT( :HITId => hit_id )}
rescue => e
# ignore mturk complaining about the task already being expired
raise e unless e.message.include? 'AWS.MechanicalTurk'
end
end
# Approves all responses submitted for a given Task, and then updates
# the approved property of the task.
def approve task
return unless task.mturk_hit
approve_remaining_assignments task
if task.task_response and task.task_response.approved.nil?
task.task_response.approved = true
task.task_response.save!
end
end
# Rejects all responses submitted for a given task, and then updates
# the approved property of the task.
def reject task
return unless task.mturk_hit
reject_remaining_assignments task
if task.task_response and task.task_response.approved.nil?
task.task_response.approved = false
task.task_response.save!
end
end
private
# Approves all responses submitted to a given Task
def approve_remaining_assignments task
hit_id = task.mturk_hit
client = mturk(task.evaluation)
return unless hit_id
mturk_run do
client.getAssignmentsForHITAll( :HITId => hit_id ).each do |assignment|
if assignment[:AssignmentStatus] == 'Submitted'
# "Submitted" means we haven't approved or denied yet
client.approveAssignment :AssignmentId => assignment[:AssignmentId]
end
end
end
end
# Rejects all responses submitted to a given task
def reject_remaining_assignments task
hit_id = task.mturk_hit
client = mturk(task.evaluation)
return unless hit_id
count = 0
mturk_run do
client.getAssignmentsForHITAll( :HITId => hit_id ).each do |assignment|
client.rejectAssignment :AssignmentId => assignment[:AssignmentId] if assignment[:AssignmentStatus] == 'Submitted'
count += 1
end
end
return count
end
public
# Removes a task from MTurk. Fails unless all responses for the task
# have been approved.
def dispose task
hit_id = task.mturk_hit
return unless hit_id
mturk_run{mturk(task.evaluation).disposeHIT( :HITId => hit_id )}
end
# Bans a user from responding to any of our HITs. Updates the user's
# banned property.
def ban_user user
mturk(user).blockWorker :WorkerId => user.id, :Reason => 'Blocked from Clockwork Raven'
user.banned = true
user.save!
end
# Unbans a previously banned user. Updates the user's banned property.
def unban_user user
mturk(user).unblockWorker :WorkerId => user.id
user.banned = false
user.save!
end
# Adds a user to the "trusted workers" group. Updates the user's trusted
# property
def trust_user user
mturk(user).assignQualification :WorkerId => user.id,
:QualificationTypeId => get_trusted_qual_id(user.prod?)
user.trusted = true
user.save!
end
# Removes a user from the "trusted workers" group. Updates the user's
# trusted property.
def untrust_user user
begin
mturk(user).revokeQualification :SubjectId => user.id,
:QualificationTypeId => get_trusted_qual_id(user.prod?)
rescue => e
# ignore invalid qualification state; it just means we've already
# untrusted this user.
raise e unless e.message.include? 'AWS.MechanicalTurk.InvalidQualificationState'
end
user.trusted = false
user.save!
end
SYNC_USERS_PAGE_SIZE = 50
# Synchronizes the list of trusted and banned users. This should be called
# if users are banned/unbanned or added/removed from trusted users
# via something other than Clockwork Raven.
def sync_users
init_if_needed
# get rid of existing lists
MTurkUser.where('prod = 1 AND ((banned = 1) OR (trusted = 1))').each do |u|
u.banned = 0
u.trusted = 0
u.save!
end
# load in trusted users
trusted = []
page_num = 1
# loop to load all pages
loop do
page = @mturk_prod.getQualificationsForQualificationType(
:QualificationTypeId => MTurkUtils.get_trusted_qual_id(true),
:PageSize => SYNC_USERS_PAGE_SIZE,
:PageNumber => page_num
)
if page[:Qualification].kind_of? Array
trusted += page[:Qualification].map{|h| h[:SubjectId]}
else
trusted.push page[:Qualification][:SubjectId]
end
break if (page[:PageNumber] * SYNC_USERS_PAGE_SIZE) > page[:TotalNumResults]
page_num += 1
end
puts "Trusting #{trusted.length} users"
trusted.each do |user_id|
user = MTurkUser.find_or_create_by_id(user_id)
user.trusted = 1
user.save!
end
# load in banned users
banned = []
page_num = 1
# loop to load all pages
loop do
page = @mturk_prod.getBlockedWorkers(
:PageSize => SYNC_USERS_PAGE_SIZE,
:PageNumber => page_num
)
if page[:WorkerBlock].kind_of? Array
banned += page[:WorkerBlock].map{|h| h[:WorkerId]}
else
banned.push page[:WorkerBlock][:WorkerId]
end
break if (page[:PageNumber] * SYNC_USERS_PAGE_SIZE) > page[:TotalNumResults]
page_num += 1
end
puts "Banning #{banned.length} users"
banned.each do |user_id|
user = MTurkUser.find_or_create_by_id(user_id)
user.banned = 1
user.save!
end
end
# runs the given block.
# if it fails, retries up to three times.
# will silently ignore ValidationExceptions and throttling after
# 3 retries
def mturk_run &block
init_if_needed
retries = 0
begin
result = block.call
rescue => e
if e.message.include? 'AWS.MechanicalTurk.HITAlreadyExists'
# it's a repeat submit, skip
return
else
# retry a few times
retries += 1
if retries <= 3
retry
else
# we're out of retry attempts.
raise e
end
end
end
return result
end
# gets the sanddbox or production client, based on the Evaluation or
# MTurkUser.
def mturk eval_or_user
init_if_needed
if eval_or_user.prod?
return @mturk_prod
else
return @mturk_sandbox
end
end
end
end