app/models/transport.rb
require "bunny"
# A wrapper around AMQP to stay DRY. Will make life easier if we ever need to
# change protocols
class Transport
OPTS = { read_timeout: 10, heartbeat: 10, log_level: "warn" }
CLOUDAMQP_ENV_KEY = ENV.fetch("WHERE_IS_CLOUDAMQP_URL", "CLOUDAMQP_URL")
def self.amqp_url
@amqp_url ||= ENV[CLOUDAMQP_ENV_KEY] ||
ENV["RABBITMQ_URL"] ||
"amqp://admin:#{ENV.fetch("ADMIN_PASSWORD")}@mqtt:5672"
end
def self.default_amqp_adapter=(value)
@default_amqp_adapter = value
end
def self.default_amqp_adapter
@default_amqp_adapter ||= Bunny
end
attr_accessor :amqp_adapter, :request_store
def self.current
@current ||= self.new
end
def self.current=(value)
@current = value
end
def connection
@connection ||= Transport
.default_amqp_adapter.new(Transport.amqp_url, OPTS).start
end
def log_channel
@log_channel ||= self.connection
.create_channel
.queue("api_log_workers")
.bind("amq.topic", routing_key: "bot.*.logs")
end
def telemetry_channel
@telemetry_channel ||= self
.connection
.create_channel
.queue("api_telemetry_workers")
.bind("amq.topic", routing_key: "bot.*.telemetry")
end
def amqp_topic
@amqp_topic ||= self
.connection
.create_channel
.topic("amq.topic", auto_delete: true)
end
def send_demo_token_to(user, secret)
fbos_version = Api::AbstractController::EXPECTED_VER
routing_key =
[Api::RmqUtilsController::DEMO_REGISTRY_ROOT, secret].join(".")
payload =
SessionToken.as_json(user, "GUEST", fbos_version).to_json
raw_amqp_send(payload, routing_key)
end
def amqp_send(message, id, channel)
raise "BAD `id`" unless id.is_a?(String) || id.is_a?(Integer)
routing_key = "bot.device_#{id}.#{channel}"
raw_amqp_send(message, routing_key)
end
def raw_amqp_send(message, routing_key)
puts message if Rails.env.production?
amqp_topic.publish(message, routing_key: routing_key)
end
# We need to hoist the Rack X-Farmbot-Rpc-Id to a global state so that it can
# be used as a unique identifier for AMQP messages.
def current_request_id
RequestStore.store[:current_request_id] || "NONE"
end
def set_current_request_id(uuid)
RequestStore.store[:current_request_id] = uuid
end
module Mgmt
require "rabbitmq/http/client"
def self.username
@username ||= URI(Transport.amqp_url).user || "admin"
end
def self.password
@password ||= URI(Transport.amqp_url).password
end
def self.api_url
uri = URI(Transport.amqp_url)
uri.scheme = ENV["FORCE_SSL"] ? "https" : "http"
uri.user = nil
uri.port = 15672
uri.to_s
end
def self.client
@client ||= RabbitMQ::HTTP::Client.new(ENV["RABBIT_MGMT_URL"] || api_url,
username: self.username,
password: self.password)
end
def self.connections
client.list_connections
end
def self.find_connection_by_name(name)
connections
.select { |x| x.fetch("user").include?(name) }
.pluck("name")
.compact
.uniq
end
def self.close_connections_for_username(name)
find_connection_by_name(name).map { |connec| client.close_connection(connec) }
end
end # Mqmt
end # Transport