lib/observed/config_builder.rb
require 'logger'
require 'thread'
require 'observed/config'
require 'observed/configurable'
require 'observed/default'
require 'observed/hash'
require 'observed/translator'
require 'observed/observed_task_factory'
module Observed
class ProcObserver < Observed::Observer
def initialize(&block)
@block = block
end
def observe(data=nil, options=nil)
@block.call data, options
end
end
class ProcTranslator < Observed::Translator
def initialize(&block)
@block = block
end
def translate(tag, time, data)
@block.call data, {tag: tag, time: time}
end
end
class ProcReporter < Observed::Reporter
def initialize(tag_pattern, &block)
@tag_pattern = tag_pattern
@block = block
end
def match(tag)
tag.match(@tag_pattern) if tag && @tag_pattern
end
def report(tag, time, data)
@block.call data, {tag: tag, time: time}
end
end
class ConfigBuilder
include Observed::Configurable
attribute :logger, default: Logger.new(STDOUT, Logger::DEBUG)
def initialize(args)
@group_mutex = ::Mutex.new
@context = args[:context]
@observer_plugins = args[:observer_plugins] if args[:observer_plugins]
@reporter_plugins = args[:reporter_plugins] if args[:reporter_plugins]
@translator_plugins = args[:translator_plugins] if args[:translator_plugins]
@system = args[:system] || fail("The key :system must be in #{args}")
configure args
end
def system
@system
end
def observer_plugins
@observer_plugins || select_named_plugins_of(Observed::Observer)
end
def reporter_plugins
@reporter_plugins || select_named_plugins_of(Observed::Reporter)
end
def translator_plugins
@translator_plugins || select_named_plugins_of(Observed::Translator)
end
def select_named_plugins_of(klass)
plugins = {}
klass.select_named_plugins.each do |plugin|
plugins[plugin.plugin_name] = plugin
end
plugins
end
def build
Observed::Config.new(
observers: observers,
reporters: reporters
)
end
# @param [Regexp] tag_pattern The pattern to match tags added to data from observers
# @param [Hash] args The configuration for each reporter which may or may not contain (1) which reporter plugin to
# use or which writer plugin to use (in combination with the default reporter plugin) (2) initialization parameters
# to instantiate the reporter/writer plugin
def report(tag_pattern=nil, args={}, &block)
if tag_pattern.is_a? ::Hash
args = tag_pattern
tag_pattern = nil
end
reporter = if args[:via] || args[:using]
via = args[:via] || args[:using]
with = args[:with] || args[:which] || {}
with = ({logger: @logger}).merge(with).merge({tag_pattern: tag_pattern, system: system})
plugin = reporter_plugins[via] ||
fail(RuntimeError, %Q|The reporter plugin named "#{via}" is not found in "#{reporter_plugins}"|)
plugin.new(with)
elsif block_given?
Observed::ProcReporter.new tag_pattern, &block
else
fail "Invalid combination of arguments: #{tag_pattern} #{args}"
end
reporters << reporter
report_it = convert_to_task(reporter)
if tag_pattern
receive(tag_pattern).then(report_it)
end
report_it
end
class ObserverCompatibilityAdapter < Observed::Observer
include Observed::Configurable
attribute :observer
attribute :system
attribute :tag
def configure(args)
super
observer.configure(args)
end
def observe(data=nil, options=nil)
case observer.method(:observe).parameters.size
when 0
observer.observe
when 1
observer.observe data
when 2
observer.observe data, options
end
end
end
# @param [String] tag The tag which is assigned to data which is generated from this observer, and is sent to
# reporters later
# @param [Hash] args The configuration for each observer which may or may not contain (1) which observer plugin to
# use or which reader plugin to use (in combination with the default observer plugin) (2) initialization parameters
# to instantiate the observer/reader plugin
def observe(tag=nil, args={}, &block)
if tag.is_a? ::Hash
args = tag
tag = nil
end
observer = if args[:via] || args[:using]
via = args[:via] || args[:using] ||
fail(RuntimeError, %Q|Missing observer plugin name for the tag "#{tag}" in "#{args}"|)
with = args[:with] || args[:which] || {}
plugin = observer_plugins[via] ||
fail(RuntimeError, %Q|The observer plugin named "#{via}" is not found in "#{observer_plugins}"|)
observer = plugin.new(({logger: logger}).merge(with).merge(tag: tag, system: system))
ObserverCompatibilityAdapter.new(
system: system,
observer: observer,
tag: tag
)
elsif block_given?
Observed::ProcObserver.new &block
else
fail "No args valid args (in args=#{args}) or a block given"
end
observe_that = convert_to_task(observer)
result = if tag
a = observe_that.then(emit(tag))
group tag, (group(tag) + [a])
a
else
observe_that
end
observers << result
result.name = "observe"
result
end
def translate(args={}, &block)
translator = if args[:via] || args[:using]
#tag_pattern || fail("Tag pattern missing: #{tag_pattern} where args: #{args}")
via = args[:via] || args[:using]
with = args[:with] || args[:which] || {}
with = ({logger: logger}).merge(with).merge({system: system})
plugin = translator_plugins[via] ||
fail(RuntimeError, %Q|The reporter plugin named "#{via}" is not found in "#{translator_plugins}"|)
plugin.new(with)
else
Observed::ProcTranslator.new &block
end
task = convert_to_task(translator)
task.name = "translate"
task
end
def emit(tag)
e = @context.event_bus.emit(tag)
e.name = "emit to #{tag}"
e
end
def receive(pattern)
@context.event_bus.receive(pattern)
end
# Updates or get the observations belongs to the group named `name`
def group(name, observations=nil)
@group_mutex.synchronize do
@observations ||= {}
@observations[name] = observations if observations
@observations[name] || []
end
end
def run_group(name)
@context.task_factory.parallel(group(name))
end
def reporters
@reporters ||= []
end
def observers
@observers ||= []
end
private
def convert_to_task(underlying)
@observed_task_factory ||= @context.observed_task_factory
@observed_task_factory.convert_to_task(underlying)
end
end
end