lib/discorb/gateway.rb
# frozen_string_literal: true
require "async/http"
require "async/websocket"
require "async/barrier"
require "json"
require "zlib"
module Discorb
#
# A module for Discord Gateway.
# This module is internal use only.
#
module Gateway
#
# A module to handle gateway events.
#
module Handler
# @type instance: Discorb::Client
private
def connect_gateway(reconnect)
Async do
@mutex["gateway_#{shard_id}"] ||= Mutex.new
@mutex["gateway_#{shard_id}"].synchronize do
if reconnect
logger.info "Reconnecting to gateway..."
else
logger.info "Connecting to gateway..."
end
@http = HTTP.new(self)
gateway_url =
if reconnect
@resume_gateway_url
else
_, gateway_response =
@http.request(Route.new("/gateway", "//gateway", :get)).wait
gateway_response[:url]
end
gateway_version = 10
endpoint =
Async::HTTP::Endpoint.parse(
"#{gateway_url}?v=#{gateway_version}&encoding=json&compress=zlib-stream&_=#{Time.now.to_i}",
alpn_protocols: Async::HTTP::Protocol::HTTP11.names
)
begin
reconnect_count = 0
begin
self.connection =
Async::WebSocket::Client.connect(
endpoint,
headers: [["User-Agent", Discorb::USER_AGENT]],
handler: RawConnection
)
rescue Async::WebSocket::ProtocolError => e
raise if reconnect_count > 3
logger.info "Failed to connect to gateway, retrying...: #{e.message}"
reconnect_count += 1
sleep 2**reconnect_count
retry
end
con = connection
zlib_stream = Zlib::Inflate.new(Zlib::MAX_WBITS)
buffer = +""
begin
while (message = con.read)
buffer << message
unless message.buffer.end_with?(
(+"\x00\x00\xff\xff").force_encoding("ASCII-8BIT")
)
next
end
begin
data = zlib_stream.inflate(buffer)
buffer = +""
message = JSON.parse(data, symbolize_names: true)
rescue JSON::ParserError
buffer = +""
logger.error "Received invalid JSON from gateway."
logger.debug data.to_s
else
handle_gateway(message, reconnect)
end
end
rescue Async::Wrapper::Cancelled,
OpenSSL::SSL::SSLError,
IOError => e
next if @status == :closed
logger.info "Gateway connection closed, reconnecting: #{e.class}: #{e.message}"
con.force_close
connect_gateway(true)
next
end
rescue Protocol::WebSocket::ClosedError => e
@tasks.map(&:stop)
case e.code
when 4004
raise ClientError.new("Authentication failed"), cause: nil
when 4009
logger.info "Session timed out, reconnecting."
con.force_close
connect_gateway(true)
next
when 4014
raise ClientError.new("Disallowed intents were specified"),
cause: nil
when 4001, 4002, 4003, 4005, 4007
raise ClientError.new(<<~ERROR), cause: e
Disconnected from gateway, probably due to library issues.
#{e.message}
Please report this to the library issue tracker.
https://github.com/discorb-lib/discorb/issues
ERROR
when 1001
logger.info "Gateway closed with code 1001, reconnecting."
con.force_close
connect_gateway(true)
next
else
logger.error "Discord WebSocket closed with code #{e.code}."
logger.debug e.message.to_s
con.force_close
connect_gateway(false)
next
end
rescue StandardError => e
logger.error "Discord WebSocket error: #{e.full_message}"
con.force_close
connect_gateway(false)
next
end
end
end
end
def send_gateway(opcode, **value)
if @shards.any? && shard.nil?
@shards.map(&:connection)
else
[connection]
end.each do |con|
con.write({ op: opcode, d: value }.to_json)
con.flush
end
logger.debug "Sent message to fd #{connection.io.fileno}: #{
{ op: opcode, d: value }.to_json.gsub(@token, "[Token]")
}"
end
def handle_gateway(payload, reconnect)
Async do |_task|
data = payload[:d]
@last_s = payload[:s] if payload[:s]
logger.debug "Received message with opcode #{payload[:op]} from gateway."
logger.debug payload.to_json.gsub(@token, "[Token]").to_s
case payload[:op]
when 10
@heartbeat_interval = data[:heartbeat_interval]
if reconnect
payload = { token: @token, session_id:, seq: @last_s }
send_gateway(6, **payload)
else
payload = {
token: @token,
intents: @intents.value,
compress: false,
properties: {
"os" => RUBY_PLATFORM,
"browser" => "discorb",
"device" => "discorb"
}
}
payload[:shard] = [shard_id, @shard_count] if shard_id
payload[:presence] = @identify_presence if @identify_presence
send_gateway(2, **payload)
end
when 7
logger.info "Received opcode 7, stopping tasks"
@tasks.map(&:stop)
when 9
logger.warn "Received opcode 9, closed connection"
@tasks.map(&:stop)
if data
logger.info "Connection is resumable, reconnecting"
connection.force_close
connect_gateway(true)
else
logger.info "Connection is not resumable, reconnecting with opcode 2"
connection.force_close
sleep(2)
connect_gateway(false)
end
when 11
logger.debug "Received opcode 11"
@ping = Time.now.to_f - @heartbeat_before
when 0
handle_event(payload[:t], data)
end
end
end
def handle_heartbeat
Async do |_task|
interval = @heartbeat_interval
sleep(((interval / 1000.0) - 1) * rand)
loop do
unless connection.closed?
@heartbeat_before = Time.now.to_f
connection.write({ op: 1, d: @last_s }.to_json)
connection.flush
logger.debug "Sent opcode 1."
logger.debug "Waiting for heartbeat."
end
sleep((interval / 1000.0) - 1)
end
end
end
def handle_event(event_name, data)
if @wait_until_ready && !@ready &&
!%w[READY GUILD_CREATE].include?(event_name)
return(
logger.debug "Client isn't ready; event #{event_name} wasn't handled"
)
end
dispatch(:event_receive, event_name, data)
logger.debug "Handling event #{event_name}"
case event_name
when "READY"
@api_version = data[:v]
self.session_id = data[:session_id]
@user = ClientUser.new(self, data[:user])
@resume_gateway_url = data[:resume_gateway_url]
@uncached_guilds = data[:guilds].map { |g| g[:id] }
ready if (@uncached_guilds == []) || !@intents.guilds
dispatch(:ready)
@tasks << handle_heartbeat
when "GUILD_CREATE"
if @uncached_guilds.include?(data[:id])
Guild.new(self, data, true)
@uncached_guilds.delete(data[:id])
if @uncached_guilds == []
logger.debug "All guilds cached"
ready
end
elsif @guilds.has?(data[:id])
@guilds[data[:id]].send(:_set_data, data, true)
dispatch(:guild_available, guild)
else
guild = Guild.new(self, data, true)
dispatch(:guild_join, guild)
end
dispatch(:guild_create, @guilds[data[:id]])
when "MESSAGE_CREATE"
message = Message.new(self, data)
dispatch(:message, message)
when "GUILD_UPDATE"
if @guilds.has?(data[:id])
current = @guilds[data[:id]]
before =
Guild.new(
self,
current.instance_variable_get(:@data).merge(no_cache: true),
false
)
current.send(:_set_data, data, false)
dispatch(:guild_update, before, current)
else
logger.warn "Unknown guild id #{data[:id]}, ignoring"
end
when "GUILD_DELETE"
unless (guild = @guilds.delete(data[:id]))
return logger.warn "Unknown guild id #{data[:id]}, ignoring"
end
dispatch(:guild_delete, guild)
if data[:unavailable]
dispatch(:guild_destroy, guild)
else
dispatch(:guild_leave, guild)
end
when "GUILD_ROLE_CREATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
nr = Role.new(@client, guild, data[:role])
guild.roles[data[:role][:id]] = nr
dispatch(:role_create, nr)
when "GUILD_ROLE_UPDATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
unless guild.roles.has?(data[:role][:id])
return logger.warn "Unknown role id #{data[:role][:id]}, ignoring"
end
current = guild.roles[data[:role][:id]]
before =
Role.new(
@client,
guild,
current.instance_variable_get(:@data).update({ no_cache: true })
)
current.send(:_set_data, data[:role])
dispatch(:role_update, before, current)
when "GUILD_ROLE_DELETE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
unless (role = guild.roles.delete(data[:role_id]))
return logger.warn "Unknown role id #{data[:role_id]}, ignoring"
end
dispatch(:role_delete, role)
when "CHANNEL_CREATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
nc = Channel.make_channel(self, data)
guild.channels[data[:id]] = nc
dispatch(:channel_create, nc)
when "CHANNEL_UPDATE"
unless (current = @channels[data[:id]])
return logger.warn "Unknown channel id #{data[:id]}, ignoring"
end
before =
Channel.make_channel(
self,
current.instance_variable_get(:@data),
no_cache: true
)
current.send(:_set_data, data)
dispatch(:channel_update, before, current)
when "CHANNEL_DELETE"
unless (channel = @channels.delete(data[:id]))
return logger.warn "Unknown channel id #{data[:id]}, ignoring"
end
@guilds[data[:guild_id]]&.channels&.delete(data[:id])
dispatch(:channel_delete, channel)
when "CHANNEL_PINS_UPDATE"
nil # do in MESSAGE_UPDATE
when "THREAD_CREATE"
thread = Channel.make_channel(self, data)
dispatch(:thread_create, thread)
if data.key?(:member)
dispatch(:thread_join, thread)
else
dispatch(:thread_new, thread)
end
when "THREAD_UPDATE"
unless (thread = @channels[data[:id]])
return logger.warn "Unknown thread id #{data[:id]}, ignoring"
end
before =
Channel.make_channel(
self,
thread.instance_variable_get(:@data),
no_cache: true
)
thread.send(:_set_data, data)
dispatch(:thread_update, before, thread)
when "THREAD_DELETE"
unless (thread = @channels.delete(data[:id]))
return logger.warn "Unknown thread id #{data[:id]}, ignoring"
end
@guilds[data[:guild_id]]&.channels&.delete(data[:id])
dispatch(:thread_delete, thread)
when "THREAD_LIST_SYNC"
data[:threads].each do |raw_thread|
thread =
Channel.make_channel(
self,
raw_thread.merge(
{
member:
raw_thread[:members].find do |m|
m[:id] == raw_thread[:id]
end
}
)
)
@channels[thread.id] = thread
end
when "THREAD_MEMBER_UPDATE"
unless (thread = @channels[data[:id]])
return logger.warn "Unknown thread id #{data[:id]}, ignoring"
end
if (member = thread.members[data[:id]])
old =
ThreadChannel::Member.new(
self,
member.instance_variable_get(:@data),
data[:guild_id]
)
member.send(:_set_data, data)
else
old = nil
member = ThreadChannel::Member.new(self, data, data[:guild_id])
thread.members[data[:user_id]] = member
end
dispatch(:thread_member_update, thread, old, member)
when "THREAD_MEMBERS_UPDATE"
unless (thread = @channels[data[:id]])
return logger.warn "Unknown thread id #{data[:id]}, ignoring"
end
thread.instance_variable_set(:@member_count, data[:member_count])
members = []
(data[:added_members] || []).each do |raw_member|
member =
ThreadChannel::Member.new(self, raw_member, data[:guild_id])
thread.members[member.id] = member
members << member
end
removed_members = []
(data[:removed_member_ids] || []).each do |id|
removed_members << thread.members.delete(id)
end
dispatch(:thread_members_update, thread, members, removed_members)
when "STAGE_INSTANCE_CREATE"
instance = StageInstance.new(self, data)
dispatch(:stage_instance_create, instance)
when "STAGE_INSTANCE_UPDATE"
unless (channel = @channels[data[:channel_id]])
return(
logger.warn "Unknown channel id #{data[:channel_id]} , ignoring"
)
end
unless (instance = channel.stage_instances[data[:id]])
return(
logger.warn "Unknown stage instance id #{data[:id]}, ignoring"
)
end
old =
StageInstance.new(
self,
instance.instance_variable_get(:@data),
no_cache: true
)
current.send(:_set_data, data)
dispatch(:stage_instance_update, old, current)
when "STAGE_INSTANCE_DELETE"
unless (channel = @channels[data[:channel_id]])
return(
logger.warn "Unknown channel id #{data[:channel_id]} , ignoring"
)
end
unless (instance = channel.stage_instances.delete(data[:id]))
return(
logger.warn "Unknown stage instance id #{data[:id]}, ignoring"
)
end
dispatch(:stage_instance_delete, instance)
when "GUILD_MEMBER_ADD"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
nm =
Member.new(
self,
data[:guild_id],
data[:user].update({ no_cache: true }),
data
)
guild.members[nm.id] = nm
dispatch(:member_add, nm)
when "GUILD_MEMBER_UPDATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
unless (nm = guild.members[data[:user][:id]])
return logger.warn "Unknown member id #{data[:user][:id]}, ignoring"
end
old =
Member.new(
self,
data[:guild_id],
data[:user],
data.update({ no_cache: true })
)
nm.send(:_set_data, data[:user], data)
dispatch(:member_update, old, nm)
when "GUILD_MEMBER_REMOVE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
unless (member = guild.members.delete(data[:user][:id]))
return logger.warn "Unknown member id #{data[:user][:id]}, ignoring"
end
dispatch(:member_remove, member)
when "GUILD_BAN_ADD"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
user =
if @users.has? data[:user][:id]
@users[data[:user][:id]]
else
User.new(self, data[:user].update({ no_cache: true }))
end
dispatch(:guild_ban_add, guild, user)
when "GUILD_BAN_REMOVE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
user =
if @users.has? data[:user][:id]
@users[data[:user][:id]]
else
User.new(self, data[:user].update({ no_cache: true }))
end
dispatch(:guild_ban_remove, guild, user)
when "GUILD_EMOJIS_UPDATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
before_emojis = guild.emojis.values.to_set(&:id)
data[:emojis].each do |emoji|
guild.emojis[emoji[:id]] = CustomEmoji.new(self, guild, emoji)
end
deleted_emojis = before_emojis - guild.emojis.values.to_set(&:id)
deleted_emojis.each { |emoji| guild.emojis.delete(emoji) }
when "GUILD_INTEGRATIONS_UPDATE"
dispatch(:guild_integrations_update, @guilds[data[:guild_id]])
when "INTEGRATION_CREATE"
dispatch(
:integration_create,
Integration.new(self, data, data[:guild_id])
)
when "INTEGRATION_UPDATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
integration = Integration.new(self, data, data[:guild_id])
dispatch(:integration_update, integration)
when "INTEGRATION_DELETE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
dispatch(:integration_delete, IntegrationDeleteEvent.new(self, data))
when "WEBHOOKS_UPDATE"
dispatch(:webhooks_update, WebhooksUpdateEvent.new(self, data))
when "INVITE_CREATE"
dispatch(:invite_create, Invite.new(self, data, true))
when "INVITE_DELETE"
dispatch(:invite_delete, InviteDeleteEvent.new(self, data))
when "VOICE_STATE_UPDATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
current = guild.voice_states[data[:user_id]]
if current.nil?
old = nil
current = VoiceState.new(self, data)
guild.voice_states[data[:user_id]] = current
else
guild.voice_states.remove(data[:user_id]) if data[:channel_id].nil?
old = VoiceState.new(self, current.instance_variable_get(:@data))
current.send(:_set_data, data)
end
dispatch(:voice_state_update, old, current)
if old&.channel != current&.channel
dispatch(:voice_channel_update, old, current)
case [old&.channel.nil?, current&.channel.nil?]
when [true, false]
dispatch(:voice_channel_connect, current)
when [false, true]
dispatch(:voice_channel_disconnect, old)
when [false, false]
dispatch(:voice_channel_move, old, current)
end
end
if old&.mute? != current&.mute?
dispatch(:voice_mute_update, old, current)
case [old&.mute?, current&.mute?]
when [false, true]
dispatch(:voice_mute_enable, current)
when [true, false]
dispatch(:voice_mute_disable, old)
end
end
if old&.deaf? != current&.deaf?
dispatch(:voice_deaf_update, old, current)
case [old&.deaf?, current&.deaf?]
when [false, true]
dispatch(:voice_deaf_enable, current)
when [true, false]
dispatch(:voice_deaf_disable, old)
end
end
if old&.self_mute? != current&.self_mute?
dispatch(:voice_self_mute_update, old, current)
case [old&.self_mute?, current&.self_mute?]
when [false, true]
dispatch(:voice_self_mute_enable, current)
when [true, false]
dispatch(:voice_self_mute_disable, old)
end
end
if old&.self_deaf? != current&.self_deaf?
dispatch(:voice_self_deaf_update, old, current)
case [old&.self_deaf?, current&.self_deaf?]
when [false, true]
dispatch(:voice_self_deaf_enable, current)
when [true, false]
dispatch(:voice_self_deaf_disable, old)
end
end
if old&.server_mute? != current&.server_mute?
dispatch(:voice_server_mute_update, old, current)
case [old&.server_mute?, current&.server_mute?]
when [false, true]
dispatch(:voice_server_mute_enable, current)
when [true, false]
dispatch(:voice_server_mute_disable, old)
end
end
if old&.server_deaf? != current&.server_deaf?
dispatch(:voice_server_deaf_update, old, current)
case [old&.server_deaf?, current&.server_deaf?]
when [false, true]
dispatch(:voice_server_deaf_enable, current)
when [true, false]
dispatch(:voice_server_deaf_disable, old)
end
end
if old&.video? != current&.video?
dispatch(:voice_video_update, old, current)
case [old&.video?, current&.video?]
when [false, true]
dispatch(:voice_video_start, current)
when [true, false]
dispatch(:voice_video_end, old)
end
end
if old&.stream? != current&.stream?
dispatch(:voice_stream_update, old, current)
case [old&.stream?, current&.stream?]
when [false, true]
dispatch(:voice_stream_start, current)
when [true, false]
dispatch(:voice_stream_end, old)
end
end
when "PRESENCE_UPDATE"
unless (guild = @guilds[data[:guild_id]])
return logger.warn "Unknown guild id #{data[:guild_id]}, ignoring"
end
guild.presences[data[:user][:id]] = Presence.new(self, data)
when "MESSAGE_UPDATE"
if (message = @messages[data[:id]])
before =
Message.new(
self,
message.instance_variable_get(:@data),
no_cache: true
)
message.send(
:_set_data,
message.instance_variable_get(:@data).merge(data)
)
else
before = nil
message = nil
end
if data[:edited_timestamp].nil?
if message.nil?
nil
elsif message.pinned?
message.instance_variable_set(:@pinned, false)
else
message.instance_variable_set(:@pinned, true)
end
dispatch(
:message_pin_update,
MessagePinEvent.new(self, data, message)
)
else
dispatch(
:message_update,
MessageUpdateEvent.new(self, data, before, current)
)
end
when "MESSAGE_DELETE"
if (message = @messages[data[:id]])
message.instance_variable_set(:@deleted, true)
end
dispatch(
:message_delete_id,
Snowflake.new(data[:id]),
channels[data[:channel_id]],
data[:guild_id] && guilds[data[:guild_id]]
)
dispatch(
:message_delete,
message,
channels[data[:channel_id]],
data[:guild_id] && guilds[data[:guild_id]]
)
when "MESSAGE_DELETE_BULK"
messages = []
data[:ids].each do |id|
if (message = @messages[id])
message.instance_variable_set(:@deleted, true)
messages.push(message)
else
messages.push(UnknownDeleteBulkMessage.new(self, id, data))
end
end
dispatch(:message_delete_bulk, messages)
when "MESSAGE_REACTION_ADD"
if (target_message = @messages[data[:message_id]])
if (
target_reaction =
target_message.reactions.find do |r|
if r.emoji.is_a?(UnicodeEmoji)
r.emoji.value == data[:emoji][:name]
else
r.emoji.id == data[:emoji][:id]
end
end
)
target_reaction.instance_variable_set(
:@count,
target_reaction.count + 1
)
else
target_message.reactions << Reaction.new(
target_message,
{
count: 1,
me: @user.id == data[:user_id],
emoji: data[:emoji]
}
)
end
end
dispatch(:reaction_add, ReactionEvent.new(self, data))
when "MESSAGE_REACTION_REMOVE"
if (target_message = @messages[data[:message_id]]) &&
(
target_reaction =
target_message.reactions.find do |r|
if data[:emoji][:id].nil?
r.emoji.name == data[:emoji][:name]
else
r.emoji.id == data[:emoji][:id]
end
end
)
target_reaction.instance_variable_set(
:@count,
target_reaction.count - 1
)
if target_reaction.count.zero?
target_message.reactions.delete(target_reaction)
end
end
dispatch(:reaction_remove, ReactionEvent.new(self, data))
when "MESSAGE_REACTION_REMOVE_ALL"
if (target_message = @messages[data[:message_id]])
target_message.reactions = []
end
dispatch(:reaction_remove_all, ReactionRemoveAllEvent.new(self, data))
when "MESSAGE_REACTION_REMOVE_EMOJI"
if (target_message = @messages[data[:message_id]]) &&
(
target_reaction =
target_message.reactions.find do |r|
if data[:emoji][:id].nil?
r.name == data[:emoji][:name]
else
r.id == data[:emoji][:id]
end
end
)
target_message.reactions.delete(target_reaction)
end
dispatch(
:reaction_remove_emoji,
ReactionRemoveEmojiEvent.new(self, data)
)
when "TYPING_START"
dispatch(:typing_start, TypingStartEvent.new(self, data))
when "INTERACTION_CREATE"
interaction = Interaction.make_interaction(self, data)
dispatch(:interaction_create, interaction)
dispatch(interaction.class.event_name, interaction)
when "RESUMED"
logger.info("Successfully resumed connection")
@tasks << handle_heartbeat
shard ? dispatch(:shard_resumed, shard) : dispatch(:resumed)
when "GUILD_SCHEDULED_EVENT_CREATE"
unless (guild = @guilds[data[:guild_id]])
logger.warn("Unknown guild id #{data[:guild_id]}, ignoring")
end
event = ScheduledEvent.new(self, data)
guild.scheduled_events[data[:id]] = event
dispatch(:scheduled_event_create, event)
when "GUILD_SCHEDULED_EVENT_UPDATE"
unless (guild = @guilds[data[:guild_id]])
logger.warn("Unknown guild id #{data[:guild_id]}, ignoring")
end
unless (event = guild.scheduled_events[data[:id]])
logger.warn("Unknown scheduled event id #{data[:id]}, ignoring")
end
old = event.dup
event.send(:_set_data, data)
dispatch(:scheduled_event_update, old, event)
if old.status == event.status
dispatch(:scheduled_event_edit, old, event)
else
case event.status
when :active
dispatch(:scheduled_event_start, event)
when :completed
dispatch(:scheduled_event_end, event)
end
end
when "GUILD_SCHEDULED_EVENT_DELETE"
unless (guild = @guilds[data[:guild_id]])
logger.warn("Unknown guild id #{data[:guild_id]}, ignoring")
end
unless (event = guild.scheduled_events[data[:id]])
logger.warn("Unknown scheduled event id #{data[:id]}, ignoring")
end
guild.scheduled_events.remove(data[:id])
dispatch(:scheduled_event_delete, event)
dispatch(:scheduled_event_cancel, event)
when "GUILD_SCHEDULED_EVENT_USER_ADD"
unless (guild = @guilds[data[:guild_id]])
logger.warn("Unknown guild id #{data[:guild_id]}, ignoring")
end
dispatch(
:scheduled_event_user_add,
ScheduledEventUserEvent.new(self, data)
)
when "GUILD_SCHEDULED_EVENT_USER_REMOVE"
unless (guild = @guilds[data[:guild_id]])
logger.warn("Unknown guild id #{data[:guild_id]}, ignoring")
end
dispatch(
:scheduled_event_user_remove,
ScheduledEventUserEvent.new(self, data)
)
when "AUTO_MODERATION_ACTION_EXECUTION"
dispatch(
:auto_moderation_action_execution,
AutoModerationActionExecutionEvent.new(self, data)
)
when "AUTO_MODERATION_RULE_CREATE"
dispatch(:auto_moderation_rule_create, AutoModRule.new(self, data))
when "AUTO_MODERATION_RULE_UPDATE"
dispatch(:auto_moderation_rule_update, AutoModRule.new(self, data))
when "AUTO_MODERATION_RULE_DELETE"
dispatch(:auto_moderation_rule_delete, AutoModRule.new(self, data))
else
if respond_to?("event_#{event_name.downcase}")
__send__("event_#{event_name.downcase}", data)
else
logger.debug "Unhandled event: #{event_name}\n#{data.inspect}"
end
end
end
def ready
Async do
if @fetch_member
logger.debug "Fetching members"
barrier = Async::Barrier.new
@guilds.each do |guild|
barrier.async(parent: barrier) { guild.fetch_members }
end
barrier.wait
end
@ready = true
if shard
logger.info("Shard #{shard_id} is ready!")
shard&.tap do |shard|
if shard.next_shard
dispatch(:shard_standby, shard)
shard.next_shard.tap do |next_shard|
logger.debug("Starting shard #{next_shard.id}")
next_shard.start
end
else
logger.info("All shards are ready!")
dispatch(:standby)
end
end
else
logger.info("Client is ready!")
dispatch(:standby)
end
end
end
end
#
# A class for connecting websocket with raw bytes data.
# @private
#
class RawConnection < Async::WebSocket::Connection
def initialize(*, **)
super
@closed = false
end
def inspect
"<#{self.class.name} #{io.fileno}>"
end
def closed?
@closed
end
def close(...)
super
@closed = true
rescue StandardError => e
warn "Failed to close connection: #{e.message}"
force_close
end
def force_close
io.close
@closed = true
end
def io
@framer
.instance_variable_get(:@stream)
.instance_variable_get(:@io)
.instance_variable_get(:@io)
.instance_variable_get(:@io)
end
def parse(buffer)
# noop
buffer.to_s
end
def dump(object)
# noop
object.to_s
end
end
end
end