lib/kafka_updater.rb
# frozen_string_literal: true
require 'rest-client'
class KafkaUpdater
def initialize(kafka_bridge_url, kafka_bridge_secret, service_id)
@kafka_bridge_url = kafka_bridge_url
@kafka_bridge_secret = kafka_bridge_secret
@service_id = service_id
end
def task_type(task)
return task.task_type if task.task_type.present?
'unknown'
end
def update_user_progress(task)
finished_successfully = false
user = User.find(task.user_id)
course = Course.find(task.course_id)
Rails.logger.info("Publishing progress for user #{user.id} with moocfi id: #{course.moocfi_id}.")
if !course.moocfi_id || course.moocfi_id.blank?
Rails.logger.error('Cannot publish progress because moocfi id is not specified')
return finished_successfully
end
parts = course.gdocs_sheets
points_per_user = AwardedPoint.count_per_user_in_course_with_sheet(course, parts, user)
available_points = AvailablePoint.course_sheet_points(course, parts)
unless points_per_user[user.username]
Rails.logger.info('No points found. Skipping')
return true
end
progress = points_per_user[user.username].map do |group_name, awarded_points|
max_points = available_points[group_name] || 0
stupid_name = "osa#{group_name.tr('^0-9', '').rjust(2, "0")}"
{
group: stupid_name,
n_points: awarded_points,
max_points: max_points,
progress: (awarded_points / max_points.to_f).floor(2)
}
end
message = {
timestamp: Time.zone.now.iso8601,
user_id: user.id,
course_id: course.moocfi_id,
service_id: @service_id,
progress: progress,
message_format_version: 1
}
topic = task.realtime ? 'user-course-progress-realtime' : 'user-course-progress-batch'
RestClient.post("#{@kafka_bridge_url}/api/v0/event", { topic: topic, payload: message }.to_json, content_type: :json, authorization: "Basic #{@kafka_bridge_secret}") unless @kafka_bridge_url == 'test'
Rails.logger.info("Publishing progress finished for user #{user.id}")
finished_successfully = true
finished_successfully
end
def update_course_progress(task)
finished_successfully = false
course = Course.find(task.course_id)
Rails.logger.info("Batch publishing progress for course #{course.name} with moocfi id: #{course.moocfi_id}.")
if !course.moocfi_id
Rails.logger.error('Cannot publish progress because moocfi id is not specified')
return finished_successfully
end
parts = course.gdocs_sheets
points_per_user = AwardedPoint.count_per_user_in_course_with_sheet(course, parts)
Rails.logger.info("Found points for #{points_per_user.keys.length} users")
available_points = AvailablePoint.course_sheet_points(course, parts)
points_per_user.each do |username, points_by_group|
current_user = User.find_by(login: username)
Rails.logger.info("Publishing progress for user #{current_user.id}")
progress = points_by_group.map do |group_name, awarded_points|
max_points = available_points[group_name] || 0
stupid_name = "osa#{group_name.tr('^0-9', '').rjust(2, "0")}"
{
group: stupid_name,
n_points: awarded_points,
max_points: max_points,
progress: (awarded_points / max_points.to_f).floor(2)
}
end
message = {
timestamp: Time.zone.now.iso8601,
user_id: current_user.id,
course_id: course.moocfi_id,
service_id: @service_id,
progress: progress,
message_format_version: 1
}
RestClient.post("#{@kafka_bridge_url}/api/v0/event", { topic: 'user-course-progress-batch', payload: message }.to_json, content_type: :json, authorization: "Basic #{@kafka_bridge_secret}") unless @kafka_bridge_url == 'test'
Rails.logger.info("Publishing progress finished for user #{current_user.id}")
end
Rails.logger.info("Batch publishing progress finished for course #{course.name}")
finished_successfully = true
finished_successfully
end
def update_user_points(task)
finished_successfully = false
course = Course.find(task.course_id)
user = User.find(task.user_id)
Rails.logger.info("Publishing points for user #{user.id} with moocfi id: #{course.moocfi_id}.")
if !course.moocfi_id
Rails.logger.error('Cannot publish points because moocfi id is not specified')
return finished_successfully
end
exercise = Exercise.find(task.exercise_id)
awarded_points = exercise.points_for(user)
completed = exercise.completed_by?(user)
message = {
timestamp: Time.zone.now.iso8601,
exercise_id: exercise.id.to_s,
n_points: awarded_points.length,
completed: completed,
attempted: true,
user_id: user.id,
course_id: course.moocfi_id,
service_id: @service_id,
required_actions: [],
message_format_version: 1
}
topic = task.realtime ? 'user-points-realtime' : 'user-points-batch'
RestClient.post("#{@kafka_bridge_url}/api/v0/event", { topic: topic, payload: message }.to_json, content_type: :json, authorization: "Basic #{@kafka_bridge_secret}") unless @kafka_bridge_url == 'test'
Rails.logger.info("Publishing points finished for user #{user.id}")
finished_successfully = true
finished_successfully
end
def update_course_points(task)
finished_successfully = false
course = Course.find(task.course_id)
Rails.logger.info("Batch publishing points for course #{course.name} with moocfi id: #{course.moocfi_id}")
if !course.moocfi_id
Rails.logger.error('Cannot publish points because moocfi id is not specified')
return finished_successfully
end
parts = course.gdocs_sheets
points_per_user = AwardedPoint.count_per_user_in_course_with_sheet(course, parts)
Rails.logger.info("Found points for #{points_per_user.keys.length} users")
exercises = Exercise.where(course_id: course.id).where(disabled_status: 0)
points_per_user.each do |username, _points_by_group|
current_user = User.find_by(login: username)
Rails.logger.info("Publishing points for user #{current_user.id}")
exercises_data = []
exercises.map do |exercise|
awarded_points = exercise.points_for(current_user)
completed = exercise.completed_by?(current_user)
user_submissions = exercise.submissions_by(current_user)
attempted = user_submissions.length > 0
original_submission_date = user_submissions.pluck(:created_at).sort.first
original_submission_date_str = original_submission_date.strftime('%FT%T%:z') unless original_submission_date.nil?
exercises_data << {
timestamp: Time.zone.now.iso8601,
exercise_id: exercise.id.to_s,
n_points: awarded_points.length,
completed: completed,
attempted: attempted,
user_id: current_user.id,
course_id: course.moocfi_id,
service_id: @service_id,
required_actions: [],
original_submission_date: original_submission_date_str,
message_format_version: 1
}
end
message = {
timestamp: Time.zone.now.iso8601,
user_id: current_user.id,
course_id: course.moocfi_id,
exercises: exercises_data,
message_format_version: 1
}
RestClient.post("#{@kafka_bridge_url}/api/v0/event", { topic: 'user-course-points-batch', payload: message }.to_json, content_type: :json, authorization: "Basic #{@kafka_bridge_secret}") unless @kafka_bridge_url == 'test'
Rails.logger.info("Publishing points finished for user #{current_user.id}")
end
Rails.logger.info("Batch publishing points finished for course #{course.name}")
finished_successfully = true
finished_successfully
end
def update_exercises(task)
finished_successfully = false
course = Course.find(task.course_id)
Rails.logger.info("Batch publishing exercises for course #{course.name} with moocfi id: #{course.moocfi_id}.")
if !course.moocfi_id
Rails.logger.error('Cannot publish exercises because moocfi id is not specified')
return finished_successfully
end
exercises = Exercise.where(course_id: course.id).where(disabled_status: 0)
data = exercises.map do |exercise|
max_points = AvailablePoint.where(exercise_id: exercise.id).count
stupid_name = "osa#{exercise.gdocs_sheet.tr('^0-9', '').rjust(2, "0")}"
exerciseData = {
name: exercise.name,
id: exercise.id.to_s,
part: stupid_name,
section: 0,
max_points: max_points
}
exerciseData
end
message = {
timestamp: Time.zone.now.iso8601,
course_id: course.moocfi_id,
service_id: @service_id,
data: data,
message_format_version: 1
}
RestClient.post("#{@kafka_bridge_url}/api/v0/event", { topic: 'exercise', payload: message }.to_json, content_type: :json, authorization: "Basic #{@kafka_bridge_secret}") unless @kafka_bridge_url == 'test'
Rails.logger.info("Batch publishing exercises finished for course #{course.name}")
finished_successfully = true
finished_successfully
end
def update_user_course_points(task)
finished_successfully = false
course = Course.find(task.course_id)
user = User.find(task.user_id)
Rails.logger.info("Publishing points for user #{user.id} with moocfi id: #{course.moocfi_id}.")
if !course.moocfi_id
Rails.logger.error('Cannot publish points because moocfi id is not specified')
return finished_successfully
end
exercises = []
course.exercises.each do |exercise|
awarded_points = exercise.points_for(user)
completed = exercise.completed_by?(user)
user_submissions = exercise.submissions_by(user)
attempted = user_submissions.length > 0
original_submission_date = user_submissions.pluck(:created_at).sort.first
original_submission_date_str = original_submission_date.strftime('%FT%T%:z') unless original_submission_date.nil?
exercises << {
timestamp: Time.zone.now.iso8601,
exercise_id: exercise.id.to_s,
n_points: awarded_points.length,
completed: completed,
attempted: attempted,
user_id: user.id,
course_id: course.moocfi_id,
service_id: @service_id,
required_actions: [],
original_submission_date: original_submission_date_str,
message_format_version: 1
}
end
message = {
timestamp: Time.zone.now.iso8601,
user_id: user.id,
course_id: course.moocfi_id,
exercises: exercises,
message_format_version: 1
}
topic = task.realtime ? 'user-course-points-realtime' : 'user-course-points-batch'
RestClient.post("#{@kafka_bridge_url}/api/v0/event", { topic: topic, payload: message }.to_json, content_type: :json, authorization: "Basic #{@kafka_bridge_secret}") unless @kafka_bridge_url == 'test'
Rails.logger.info("Publishing points finished for user #{user.id}")
finished_successfully = true
finished_successfully
end
end