lib/actors/etl_producer.rb
require 'actors/etl_base'
require 'end_of_data'
# Producer module for implementing ETL consumer - producer Actor protocol.
#
# ETL consumer - producer protocol with back-pressure is rather simple at it's core:
# There are two parties: producer and consumer that are both actor instances.
#
# - Producer produces (by loading, computing, generating, etc.) data (called rows)
# and sends them asynchronously to the consumer.
#
# - Consumer accepts rows from the producer and does something with them (saves them, sends
# them somewhere else, etc.).
#
# Because rows are exchanged asynchronously and consuming could be slower than producing,
# a way how to limit data flow between the actors is needed.
#
# This is solved by utilising back-pressure: Producer must only produce and send rows when consumer
# can accept them. The ability to receive rows is signalled by a message from the consumer to the producer.
# In response to that message a single row can be sent to the consumer. If the producer has no rows
# available for sending at the moment, it should internally store the state of its consumer and send a row
# when it becomes available. After the consumer is finished with processing the received row, it requests
# another row from the producer and the cycle repeats.
#
module ETLProducer
include ETLBase
# Sets actor output for sending produced rows and EOS.
#
# @param output [Symbol] actor name in Celluloid actor registry
def output=(output)
@_output = output
end
# Sends a single row to actor's output asynchronously.
def emit_row(row)
logger.debug "Emiting row: #{self.class.name} -> #{@_output}"
Celluloid::Actor[@_output].async.consume_row(row)
end
# Sends EOF signal to actor's output (if it has one).
def emit_eof
logger.debug "Emiting EOF: #{self.class.name} -> #{@_output}"
Celluloid::Actor[@_output].async.receive_eof if @_output
end
# Stores a single row in a local output buffer (in case output is stuffed)
# or sends it to the output directly.
def output_row(row)
if output_hungry?
@_output_state = :stuffed
emit_row(row)
produce_row() unless empty?
else
buffer_put(row)
end
end
def buffer_empty?
@_buffer.nil?
end
def buffer_full?
!@_buffer.nil?
end
def buffer_pop
raise "Output buffer empty." if buffer_empty?
row = @_buffer
@_buffer = nil
row
end
def buffer_put(row)
raise "Output buffer full." if buffer_full?
@_buffer = row
end
# Receive work request from it's output.
def receive_hungry
return if output_hungry?
if buffer_empty?
@_output_state = :hungry
else
emit_row(buffer_pop())
produce_row() unless empty?
end
end
def process_eof
@_eof_received = true
emit_eof if empty?
end
def output_hungry?
@_output_state == :hungry
end
# Tries to generate a single row and then send it to the output or output buffer if the row was
# successfully generated. If no row was generated, it sets empty flag and notifies its
# input (if it has one).
#
# You should define #generate_row method on your producer actor, which returns either a single
# row (pretty much anything) or throws {EndOfData} or `StopIteration` error in case there there
# is no more rows to generate. (hint: Ruby's Enumerator#next behaves exactly like that)
#
def produce_row
unmark_empty!
begin
output_row(generate_row())
logger.debug "Generating a row."
rescue StopIteration, EndOfData
logger.debug "All pending rows processed."
mark_empty!
notify_hungry if respond_to? :notify_hungry
emit_eof if eof_received?
end
end
end