ucberkeley/moocchat

View on GitHub
app/middleware/chat_server.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'faye/websocket'
require 'json'
require 'erb'

# The chat server maintains a nested hash to keep track of existing websockets.
# A chat group consists of sorted, comma-separa
#  { chatroom1 => { participant1 => websocket1, participant2 => websocket2, ...},
#    chatroom2 => { participant1 => websocket1, participant2 => websocket2, ...} }

class ChatServer

  class MalformedWebsocketUrlError < RuntimeError;  end
  
  KEEPALIVE_TIME = 30 # in seconds
  GREETING = "Student %s"
  GENERIC_ASYNC_RACK_RESPONSE = [ -1, {}, [] ]
  attr_accessor :groups
  def initialize(app)
    @app     = app
    @groups = {}
  end
  
  def call(env)
    if Faye::WebSocket.websocket?(env)
      ensure_setup_socket_for(env)
    else
      @app.call(env)
    end
  end

  # URL for a websocket request will contain a list of N+1 comma-separated
  # numbers.  The first N numbers are the task IDs of the group members,
  # always in ascending order, and the last is taskID for *this* member.
  # So eg "7,13,23,13" is a group of 3 people with task IDs 7,13,23, and
  # the person responsible for *this* request is task ID 13.
  # Task IDs are never recycled.
  def channel_and_position_from_url(url)
    if url =~ /\b([0-9,]+),([0-9]+)\b/
      channel, my_id = $1, $2
      position = channel.split(/,/).index(my_id)
      abort_with "Malformed URL '#{url}': index out of range" unless position
    else
      abort_with "Malformed URL: #{url}"
    end
    return channel, position, my_id
  end

  def print_exception_info(e, msg)
    puts msg
    puts '  %s: %s' % [e.class, e.message]
    puts e.backtrace
  end

  def ensure_setup_socket_for(env)
    channel, my_position, my_id = channel_and_position_from_url(env['ORIGINAL_FULLPATH'])
    unless @groups.has_key?(channel) && @groups[channel][my_position]
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      @groups[channel] ||= []
      @groups[channel][my_position] = ws
      ws.on(:open)    { |event| ; }
      ws.on(:message) { |event|
        begin
          redistribute_message(event.data, channel, my_position)
        rescue Exception => e
          print_exception_info(e, 'websocket server encountered unhandled exception while processing message event:')
          raise e
        end
      }
      ws.on(:close)   {
        begin
          @groups[channel][my_position] = nil
          # Currently, this works on Firefox but causes issues on IE
          # and Chrome because close events happen when navigating
          # between pages. Heartbeat will still catch it eventually.
          # TODO: find an alternative way to deal with this.
          # redistribute_message('{"text": "", "taskid": ' + my_id.to_s + ', "type": "disconnect"}', channel, my_position)
        rescue Exception => e
          print_exception_info(e, 'websocket server encountered unhandled exception while processing close event:')
          raise e
        end
      }
      ws.rack_response   # async Rack response
    else
      GENERIC_ASYNC_RACK_RESPONSE
    end
  end

  def redistribute_message(websocket_data, channel, my_position)
    message = extract_text(websocket_data)
    type = extract_type(websocket_data)
    taskid = extract_taskid(websocket_data)
    if type != "heartbeat"   # Heartbeat too spammy
      puts "redistribution #{type}: #{message}"
    end
    if type == "message"
      speaker_position = channel.split(/,/).index(taskid.to_s)
      speaker = "Student #{1+speaker_position}"
      style_class = "student-#{1+speaker_position}"
      json = create_text_message "#{speaker}: #{message}", taskid, style_class
    end
    if type == "end-vote"
      json = create_end_vote taskid
    end
    if type == "heartbeat"
      json = create_heartbeat taskid
    end
    if type == "disconnect"
      json = create_disconnect taskid
    end
    @groups[channel].each_with_index do |websocket, position|
      unless websocket.nil?  # May be nil if learner disconnected
        websocket.send json
      end
    end
  end

  private

  def extract_text(event)
    JSON.parse(event)['text']
  end

  def extract_taskid(event)
    JSON.parse(event)['taskid']
  end

  def extract_type(event)
    JSON.parse(event)['type']
  end

  def create_text_message(text, taskid, style_class)
    {:text => text, :type => "message", :taskid => taskid, :style_class => style_class }.to_json
  end

  def create_end_vote(taskid)
    {:text => "", :type => "end-vote", :taskid => taskid }.to_json
  end

  def create_heartbeat(taskid)
    {:text => "", :type => "heartbeat", :taskid => taskid }.to_json
  end

  def create_disconnect(taskid)
    {:text => "", :type => "disconnect", :taskid => taskid }.to_json
  end

  def abort_with(message)
    puts "Aborting with #{message}"
    raise MalformedWebsocketUrlError, message
    #  should notify via hoptoad or something as well
  end
end