lib/analysand/change_watcher.rb
require 'celluloid'
require 'celluloid/io'
require 'analysand/connection_testing'
require 'http/parser'
require 'net/http'
require 'rack/utils'
require 'uri'
require 'yajl'
module Analysand
##
# A Celluloid::IO actor that watches the changes feed of a CouchDB database.
# When a change is received, it passes the change to a #process method.
#
# ChangeWatchers monitor changes using continuous mode and set up a heartbeat
# to fire approximately every 10 seconds.
#
# ChangeWatchers begin watching for changes as soon as they are initialized.
# To send a shutdown message:
#
# a.stop
#
# The watcher will terminate on the next heartbeat.
#
#
# Failure modes
# =============
#
# ChangeWatcher deals with the following failures in the following ways:
#
# * If Errno::ECONNREFUSED is raised whilst connecting to CouchDB, it will
# retry the connection in 30 seconds.
# * If the connection to CouchDB's changes feed is abruptly terminated, it
# dies.
# * If an exception is raised during HTTP or JSON parsing, it dies.
#
# Situations where the actor dies should be handled by a supervisor.
#
#
# Example usage
# =============
#
# class Accumulator < Analysand::ChangeWatcher
# attr_accessor :results
#
# def initialize(database)
# super(database)
#
# self.results = []
# end
#
# def process(change)
# results << change
#
# # Once a ChangeWatcher has successfully processed a change, it
# # SHOULD invoke #change_processed.
# change_processed(change)
# end
# end
#
# a = Accumulator.new('http://localhost:5984/mydb')
#
# # or with supervision:
# a = Accumulator.supervise('http://localhost:5984/mydb')
class ChangeWatcher
include Celluloid::IO
include Celluloid::Logger
include ConnectionTesting
include Rack::Utils
# Read at most this many bytes off the socket at a time.
QUANTUM = 4096
##
# Checks services. If all services pass muster, enters a read loop.
#
# The database parameter may be either a URL-as-string or a
# Analysand::Database.
#
# If overriding the initializer, you MUST call super.
def initialize(database)
@db = database
@waiting = {}
@http_parser = ::Http::Parser.new(self)
@json_parser = Yajl::Parser.new
@json_parser.on_parse_complete = lambda { |doc| process(doc) }
async.start
end
# The URI of the changes feed. This URI incorporates any changes
# made by customize_query.
def changes_feed_uri
query = {
'feed' => 'continuous',
'heartbeat' => '10000'
}
customize_query(query)
uri = (@db.respond_to?(:uri) ? @db.uri : URI(@db)).dup
uri.path += '/_changes'
uri.query = build_query(query)
uri
end
# The connection_ok method is called before connecting to the changes feed.
# By default, it checks that there's an HTTP service listening on the
# changes feed.
#
# If the method returns true, then we connect to the changes feed and begin
# processing. If it returns false, a warning message is logged and the
# connection check will be retried in 30 seconds.
#
# This method can be overridden if you need to check additional services.
# When you override the method, make sure that you don't discard the return
# value of the original definition:
#
# # Wrong
# def connection_ok
# super
# ...
# end
#
# # Right
# def connection_ok
# ok = super
#
# ok && my_other_test
# end
def connection_ok
test_http_connection(changes_feed_uri) do |req|
customize_request(req)
end
end
def start
return if @started
@started = true
while !connection_ok
error "Some services used by #{self.class.name} did not check out ok; will retry in 30 seconds"
sleep 30
end
connect
info "#{self.class} entering read loop"
@running = true
while @running
@http_parser << @socket.readpartial(QUANTUM)
end
# Once we're done, close things up.
@started = false
@socket.close
end
def stop
@running = false
end
##
# Can be used to set query parameters. query is a Hash. The query hash
# has two default parameters:
#
# | Key | Value |
# | feed | continuous |
# | heartbeat | 10000 |
#
# It is NOT RECOMMENDED that they be changed.
#
# By default, this does nothing. Provide behavior in a subclass.
def customize_query(query)
end
##
# Can be used to add headers. req is a Net::HTTP::Get instance.
#
# By default, this does nothing. Provide behavior in a subclass.
def customize_request(req)
end
##
# This method should implement your change-processing logic.
#
# change is a Hash containing keys id, seq, and changes. See [0] for
# more information.
#
# By default, this does nothing. Provide behavior in a subclass.
#
# [0]: http://guide.couchdb.org/draft/notifications.html#continuous
def process(change)
end
class Waiter < Celluloid::Future
alias_method :wait, :value
end
##
# Returns an object that can be used to block a thread until a document
# with the given ID has been processed.
#
# Intended for testing.
def waiter_for(id)
@waiting[id] = true
Waiter.new do
loop do
break true if !@waiting[id]
sleep 0.1
end
end
end
##
# Notify waiters.
def change_processed(change)
@waiting.delete(change['id'])
end
##
# Http::Parser callback.
#
# @private
def on_headers_complete(parser)
status = @http_parser.status_code.to_i
raise "Request failed: expected status 200, got #{status}" unless status == 200
end
##
# Http::Parser callback.
#
# @private
def on_body(chunk)
@json_parser << chunk
end
##
# @private
def connect
req = prepare_request
uri = changes_feed_uri
info "#{self.class} connecting to #{req.path}"
@socket = TCPSocket.new(uri.host, uri.port)
# Make the request.
data = [
"GET #{req.path} HTTP/1.1"
]
req.each_header { |k, v| data << "#{k}: #{v}" }
@socket.write(data.join("\r\n"))
@socket.write("\r\n\r\n")
end
##
# @private
def disconnect
@socket.close if @socket && !@socket.closed?
end
finalizer :disconnect
##
# @private
def prepare_request
Net::HTTP::Get.new(changes_feed_uri.to_s).tap do |req|
customize_request(req)
end
end
end
end