lib/pants/seam.rb
require_relative 'readers/base_reader'
class Pants
# A Seam is a core Pants object type (like Readers and Writers) that lets you
# attach to a Reader, work with the read data, and pass it on to attached
# Writers. It implements buffering by using EventMachine Queues: pop data
# off the @read_queue, work with it, then push it onto the @write_queue. Once
# on the @write_queue, the Seam will pass on to all Writers that have been
# added to it.
#
# The @read_queue is wrapped by #read_items, which yields data
# chunks from the Reader in, allowing easy access to each bit of data as it
# was when it was read in. The @write_queue is wrapped by #write, which
# lets you just give it the data you want to pass on to the attached Writers.
#
# Seams are particularly useful for working with network data, where if you're
# redirecting traffic from one place to another, you may need to alter data
# in those packets to make it useful to the receiving ends.
class Seam < Pants::Readers::BaseReader
include LogSwitch::Mixin
# @param [EventMachine::Callback] core_stopper_callback The callback that's
# provided by Core.
#
# @param [EventMachine::Channel] reader_channel The channel from the Reader
# that the Seam is attached to.
def initialize(core_stopper_callback, reader_channel)
@read_queue = EM::Queue.new
@write_queue = EM::Queue.new
@write_object ||= nil
@receives = 0
@reads = 0
@writes = 0
@sends = 0
reader_channel.subscribe do |data|
log "Got data on reader channel"
@read_queue << data
@receives += data.size
end
super(core_stopper_callback)
send_data
end
def start(callback)
super(callback)
starter.call
end
# Make sure you call this (with super()) in your child to ensure read and
# write queues are flushed.
def stop
log "Stopping..."
log "receives #{@receives}"
log "reads #{@reads}"
log "writes #{@writes}"
log "sends #{@sends}"
finish_loop = EM.tick_loop do
if @read_queue.empty? && @write_queue.empty?
:stop
end
end
finish_loop.on_stop { stopper.call }
end
# @return [String] A String that identifies what the writer is writing to.
# This is simply used for displaying info to the user.
def write_object
if @write_object
@write_object
else
warn "No write_object info has been defined for this writer."
end
end
# Call this to read data that was put into the read queue. It yields one
# "item" (however the data was put onto the queue) at a time. It will
# continually yield as there is data that comes in on the queue.
#
# @param [Proc] block The block to yield items from the reader to.
# @yield [item] Gives one item off the read queue.
def read_items(&block)
processor = proc do |item|
block.call(item)
@reads += item.size
@read_queue.pop(&processor)
end
@read_queue.pop(&processor)
end
# Call this after your Seam child has processed data and is ready to send it
# to its writers.
#
# @param [Object] data
def write(data)
@write_queue << data
@writes += data.size
end
private
def send_data
processor = proc do |data|
@write_to_channel << data
@sends += data.size
@write_queue.pop(&processor)
end
@write_queue.pop(&processor)
end
end
end