gree/observed

View on GitHub
lib/observed/config_builder.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require 'logger'
require 'thread'

require 'observed/config'
require 'observed/configurable'
require 'observed/default'
require 'observed/hash'
require 'observed/translator'
require 'observed/observed_task_factory'

module Observed

  class ProcObserver < Observed::Observer
    def initialize(&block)
      @block = block
    end
    def observe(data=nil, options=nil)
      @block.call data, options
    end
  end

  class ProcTranslator < Observed::Translator
    def initialize(&block)
      @block = block
    end
    def translate(tag, time, data)
      @block.call data, {tag: tag, time: time}
    end
  end

  class ProcReporter < Observed::Reporter
    def initialize(tag_pattern, &block)
      @tag_pattern = tag_pattern
      @block = block
    end
    def match(tag)
      tag.match(@tag_pattern) if tag && @tag_pattern
    end
    def report(tag, time, data)
      @block.call data, {tag: tag, time: time}
    end
  end

  class ConfigBuilder
    include Observed::Configurable

    attribute :logger, default: Logger.new(STDOUT, Logger::DEBUG)

    def initialize(args)
      @group_mutex = ::Mutex.new
      @context = args[:context]
      @observer_plugins = args[:observer_plugins] if args[:observer_plugins]
      @reporter_plugins = args[:reporter_plugins] if args[:reporter_plugins]
      @translator_plugins = args[:translator_plugins] if args[:translator_plugins]
      @system = args[:system] || fail("The key :system must be in #{args}")
      configure args
    end

    def system
      @system
    end

    def observer_plugins
      @observer_plugins || select_named_plugins_of(Observed::Observer)
    end

    def reporter_plugins
      @reporter_plugins || select_named_plugins_of(Observed::Reporter)
    end

    def translator_plugins
      @translator_plugins || select_named_plugins_of(Observed::Translator)
    end

    def select_named_plugins_of(klass)
      plugins = {}
      klass.select_named_plugins.each do |plugin|
        plugins[plugin.plugin_name] = plugin
      end
      plugins
    end

    def build
      Observed::Config.new(
          observers: observers,
          reporters: reporters
      )
    end

    # @param [Regexp] tag_pattern The pattern to match tags added to data from observers
    # @param [Hash] args The configuration for each reporter which may or may not contain (1) which reporter plugin to
    # use or which writer plugin to use (in combination with the default reporter plugin) (2) initialization parameters
    # to instantiate the reporter/writer plugin
    def report(tag_pattern=nil, args={}, &block)
      if tag_pattern.is_a? ::Hash
        args = tag_pattern
        tag_pattern = nil
      end
      reporter = if args[:via] || args[:using]
                   via = args[:via] || args[:using]
                   with = args[:with] || args[:which] || {}
                   with = ({logger: @logger}).merge(with).merge({tag_pattern: tag_pattern, system: system})
                   plugin = reporter_plugins[via] ||
                       fail(RuntimeError, %Q|The reporter plugin named "#{via}" is not found in "#{reporter_plugins}"|)
                   plugin.new(with)
                 elsif block_given?
                   Observed::ProcReporter.new tag_pattern, &block
                 else
                   fail "Invalid combination of arguments: #{tag_pattern} #{args}"
                 end

      reporters << reporter
      report_it = convert_to_task(reporter)
      if tag_pattern
        receive(tag_pattern).then(report_it)
      end
      report_it
    end

    class ObserverCompatibilityAdapter < Observed::Observer
      include Observed::Configurable
      attribute :observer
      attribute :system
      attribute :tag

      def configure(args)
        super
        observer.configure(args)
      end

      def observe(data=nil, options=nil)
        case observer.method(:observe).parameters.size
          when 0
            observer.observe
          when 1
            observer.observe data
          when 2
            observer.observe data, options
        end
      end
    end

    # @param [String] tag The tag which is assigned to data which is generated from this observer, and is sent to
    # reporters later
    # @param [Hash] args The configuration for each observer which may or may not contain (1) which observer plugin to
    # use or which reader plugin to use (in combination with the default observer plugin) (2) initialization parameters
    # to instantiate the observer/reader plugin
    def observe(tag=nil, args={}, &block)
      if tag.is_a? ::Hash
        args = tag
        tag = nil
      end
      observer = if args[:via] || args[:using]
                   via = args[:via] || args[:using] ||
                       fail(RuntimeError, %Q|Missing observer plugin name for the tag "#{tag}" in "#{args}"|)
                   with = args[:with] || args[:which] || {}
                   plugin = observer_plugins[via] ||
                       fail(RuntimeError, %Q|The observer plugin named "#{via}" is not found in "#{observer_plugins}"|)
                   observer = plugin.new(({logger: logger}).merge(with).merge(tag: tag, system: system))
                   ObserverCompatibilityAdapter.new(
                     system: system,
                     observer: observer,
                     tag: tag
                   )
                 elsif block_given?
                   Observed::ProcObserver.new &block
                 else
                   fail "No args valid args (in args=#{args}) or a block given"
                 end
      observe_that = convert_to_task(observer)
      result = if tag
        a = observe_that.then(emit(tag))
        group tag, (group(tag) + [a])
        a
      else
        observe_that
      end
      observers << result
      result.name = "observe"
      result
    end

    def translate(args={}, &block)
      translator = if args[:via] || args[:using]
                     #tag_pattern || fail("Tag pattern missing: #{tag_pattern} where args: #{args}")
                     via = args[:via] || args[:using]
                     with = args[:with] || args[:which] || {}
                     with = ({logger: logger}).merge(with).merge({system: system})
                     plugin = translator_plugins[via] ||
                         fail(RuntimeError, %Q|The reporter plugin named "#{via}" is not found in "#{translator_plugins}"|)
                     plugin.new(with)
                   else
                     Observed::ProcTranslator.new &block
                 end
      task = convert_to_task(translator)
      task.name = "translate"
      task
    end

    def emit(tag)
      e = @context.event_bus.emit(tag)
      e.name = "emit to #{tag}"
      e
    end

    def receive(pattern)
      @context.event_bus.receive(pattern)
    end

    # Updates or get the observations belongs to the group named `name`
    def group(name, observations=nil)
      @group_mutex.synchronize do
        @observations ||= {}
        @observations[name] = observations if observations
        @observations[name] || []
      end
    end

    def run_group(name)
      @context.task_factory.parallel(group(name))
    end

    def reporters
      @reporters ||= []
    end

    def observers
      @observers ||= []
    end

    private

    def convert_to_task(underlying)
      @observed_task_factory ||= @context.observed_task_factory
      @observed_task_factory.convert_to_task(underlying)
    end
  end

end