lib/firehose/client/consumer.rb
require 'json'
# TODO - Spec this thing out. Unfortunately its not tested at all, mostly because the JSON client
# is more important (and tested). Still, this should be tested, at least used by the specs
# to test HTTP functionality of the server.
module Firehose
module Client
module Consumer
# TODO - Test this libs. I had to throw these quickly into our app so that we could get
# some stress testing out of the way.
# TODO - Replace the integration test clients with these guys. You'll want to refactor each
# transport to use on(:message), on(:conncect), and on(:disconnect) callbacks.
TransportNotSupportedError = Class.new(RuntimeError)
# Build up a benchmark client based on a given URI. Accepts ws:// and http:// for now.
def self.parse(uri)
case transport = ::URI.parse(uri).scheme
# TODO - Fix ws:// transport! See class WebSocket below to udnerstand
# why this doesn't work and support is dropped from the CLI.
# when 'ws'
# Consumer::WebSocket.new(uri)
when 'http'
Consumer::HttpLongPoll.new(uri)
else
raise TransportNotSupportedError.new("Transport #{transport.inspect} not supported.")
end
end
# TODO - This won't even work. Dropping ws:// above until this is tested. This thing
# should be sending message sequences to Firehose.
# Connect to Firehose via WebSockets and consume messages.
class WebSocket
attr_reader :url, :logger
def initialize(url, logger = Firehose.logger)
@url, @logger = url, logger
end
def request
ws = Faye::WebSocket::Client.new(url)
ws.onmessage = lambda do |event|
logger.info "WS | #{event.data[0...40].inspect}"
end
ws.onclose = lambda do |event|
logger.info "WS | Closed"
end
ws.onerror do
logger.error "WS | Failed"
end
end
end
# Connect to Firehose via HTTP Long Polling and consume messages.
class HttpLongPoll
JITTER = 0.003
attr_reader :url, :logger
def initialize(url, logger = Firehose.logger)
@url, @logger = url, logger
end
def request(last_sequence=0)
http = EM::HttpRequest.new(url, :inactivity_timeout => 0).get(:query => {'last_message_sequence' => last_sequence})
http.callback do
case status = http.response_header.status
when 200
json = JSON.parse(http.response)
next_sequence = json['last_sequence'].to_i
message = json['message']
logger.info "HTTP 200 | Next Sequence: #{next_sequence} - #{message[0...40].inspect}"
EM::add_timer(jitter) { request next_sequence }
when 204
logger.info "HTTP 204 | Last Sequence #{last_sequence}"
EM::add_timer(jitter) { request last_sequence }
else
logger.error "HTTP #{status} | Failed"
end
end
http.errback do
logger.error "Connection Failed"
end
end
private
# Random jitter between long poll requests.
def jitter
rand*JITTER
end
end
end
end
end