lib/websocket_server.rb
# Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
class WebsocketServer
cattr_reader :clients, :options
def self.run(options)
@options = options
@clients = {}
# By default, we are only logging errors to STDOUT.
# To turn on some more logging to get some insights, please, provide one of the following parameters:
# -n | --info => info
# -v | --verbose => debug
Rails.configuration.interface = 'websocket'
EventMachine.run do
EventMachine::WebSocket.start(host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: @options[:tls_options]) do |ws|
# register client connection
ws.onopen do |handshake|
WebsocketServer.onopen(ws, handshake)
end
# unregister client connection
ws.onclose do
WebsocketServer.onclose(ws)
end
# manage messages
ws.onmessage do |msg|
WebsocketServer.onmessage(ws, msg)
end
end
# check unused connections
EventMachine.add_timer(0.5) do
WebsocketServer.check_unused_connections
end
# check open unused connections, kick all connection without activity in the last 2 minutes
EventMachine.add_periodic_timer(120) do
WebsocketServer.check_unused_connections
end
EventMachine.add_periodic_timer(20) do
WebsocketServer.log_status
end
EventMachine.add_periodic_timer(0.4) do
WebsocketServer.send_to_client
end
end
end
def self.onopen(websocket, handshake)
headers = handshake.headers
client_id = websocket.object_id.to_s
log 'info', 'Client connected.', client_id
Sessions.create(client_id, {}, { type: 'websocket' })
return if @clients.include? client_id
@clients[client_id] = {
websocket: websocket,
last_ping: Time.now.utc.to_i,
error_count: 0,
headers: headers,
}
end
def self.onclose(websocket)
client_id = websocket.object_id.to_s
log 'info', 'Client disconnected.', client_id
# removed from current client list
if @clients.include? client_id
@clients.delete client_id
end
Sessions.destroy(client_id)
end
def self.onmessage(websocket, msg)
client_id = websocket.object_id.to_s
log 'info', "receiving #{msg.to_s.bytesize} bytes", client_id
log 'debug', "received: #{msg}", client_id
begin
log 'info', 'start: parse message to JSON', client_id
data = JSON.parse(msg)
log 'info', 'end: parse message to JSON', client_id
rescue => e
log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
return
end
# check if connection not already exists
return if !@clients[client_id]
Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations
@clients[client_id][:last_ping] = Time.now.utc.to_i
if data['event']
log 'info', "start: execute event '#{data['event']}'", client_id
message = Sessions::Event.run(
event: data['event'],
payload: data,
session: @clients[client_id][:session],
headers: @clients[client_id][:headers],
client_id: client_id,
clients: @clients,
options: @options,
)
log 'info', "end: execute event '#{data['event']}'", client_id
if message
websocket_send(client_id, message)
end
else
log 'error', "unknown message '#{data.inspect}'", client_id
end
ensure
ActiveSupport::CurrentAttributes.clear_all
end
def self.websocket_send(client_id, data)
msg = if data.instance_of?(Array)
data.to_json
else
"[#{data.to_json}]"
end
log 'info', "sending #{msg.to_s.bytesize} bytes", client_id
log 'debug', "send: #{msg}", client_id
if !@clients[client_id]
log 'error', "no such @clients for #{client_id}", client_id
return
end
@clients[client_id][:websocket].send(msg)
end
def self.check_unused_connections
log 'info', 'check unused idle connections...'
idle_time_in_sec = 4 * 60
# close unused web socket sessions
@clients.each do |client_id, client|
next if (client[:last_ping].to_i + idle_time_in_sec) >= Time.now.utc.to_i
log 'info', 'closing idle websocket connection', client_id
# remember to not use this connection anymore
client[:disconnect] = true
# try to close regular
client[:websocket].close_websocket
# delete session from client list
sleep 0.3
@clients.delete(client_id)
end
# close unused ajax long polling sessions
clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
clients.each do |client_id|
log 'info', 'closing idle long polling connection', client_id
end
end
def self.send_to_client
return if @clients.empty?
# log 'debug', 'checking for data to send...'
@clients.each do |client_id, client|
next if client[:disconnect]
log 'debug', 'checking for data...', client_id
begin
queue = Sessions.queue(client_id)
next if queue.blank?
websocket_send(client_id, queue)
rescue => e
log 'error', "problem:#{e.inspect}", client_id
# disconnect client
client[:error_count] += 1
if client[:error_count] > 20 && @clients.include?(client_id)
@clients.delete client_id
end
end
end
end
def self.log_status
# websocket
log 'info', "Status: websocket clients: #{@clients.size}"
@clients.each_key do |client_id|
log 'info', 'working...', client_id
end
# ajax
client_list = Sessions.list
clients = 0
client_list.each_value do |client|
next if client[:meta][:type] == 'websocket'
clients += 1
end
log 'info', "Status: ajax clients: #{clients}"
client_list.each do |client_id, client|
next if client[:meta][:type] == 'websocket'
log 'info', 'working...', client_id
end
end
def self.log(level, data, client_id = '-')
skip_log = true
case level
when 'error'
skip_log = false
when 'debug'
if @options[:v]
skip_log = false
end
when 'info'
if @options[:n] || @options[:v]
skip_log = false
end
end
return if skip_log
client_id_str = client_id.eql?('-') ? '' : "##{client_id}"
# same format as in log/production.log
puts "#{level.to_s.first.upcase}, [#{Time.now.utc.strftime('%FT%T.%6N')}#{client_id_str}] #{level.upcase} -- : #{data}" # rubocop:disable Rails/Output
end
end