colinsurprenant/redstorm

View on GitHub
lib/red_storm/dsl/topology.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'

java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.generated.SubmitOptions'

module RedStorm
  module DSL

    class TopologyDefinitionError < StandardError; end

    class Topology
      attr_reader :cluster # LocalCluster reference usable in on_submit block, for example

      DEFAULT_SPOUT_PARALLELISM = 1
      DEFAULT_BOLT_PARALLELISM = 1

      class ComponentDefinition < Configurator
        attr_reader :clazz, :constructor_args, :parallelism
        attr_accessor :id # ids are forced to string

        def initialize(component_class, constructor_args, id, parallelism)
          super()
          @clazz = component_class
          @constructor_args = constructor_args
          @id = id.to_s
          @parallelism = parallelism
          @output_fields = []
        end

        def output_fields(*args)
          args.empty? ? @output_fields : @output_fields = args.map(&:to_s)
        end

        def is_java?
          @clazz.name.split('::').first.downcase == 'java'
        end
      end

      class SpoutDefinition < ComponentDefinition

        # WARNING non-dry see BoltDefinition#new_instance
        def new_instance
          if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
            @clazz.new(constructor_args, @output_fields)
          elsif is_java?
            @clazz.new(*constructor_args)
          else
            Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
          end
        end
      end

      class BoltDefinition < ComponentDefinition
        attr_accessor :sources, :command

        def initialize(*args)
          super
          @sources = []
        end

        def source(source_id, grouping)
          @sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
        end

        def define_grouping(declarer)
          @sources.each do |source_id, grouping|
            grouper, params = grouping.first
              # declarer.fieldsGrouping(source_id, Fields.new())
            case grouper
            when :fields
              declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s))))
            when :global
              declarer.globalGrouping(source_id)
            when :shuffle
              declarer.shuffleGrouping(source_id)
            when :local_or_shuffle
              declarer.localOrShuffleGrouping(source_id)
            when :none
              declarer.noneGrouping(source_id)
            when :all
              declarer.allGrouping(source_id)
            when :direct
              declarer.directGrouping(source_id)
            else
              raise("unknown grouper=#{grouper.inspect}")
            end
          end
        end

        def new_instance
          # WARNING non-dry see BoltDefinition#new_instance
          if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt"
            @clazz.new(constructor_args, @output_fields)
          elsif is_java?
            @clazz.new(*constructor_args)
          else
            Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
          end
        end
      end

      def self.log
        @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
      end

      # def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)
      def self.spout(spout_class, *args, &spout_block)
        set_topology_class!
        options = args.last.is_a?(Hash) ? args.pop : {}
        contructor_args = !args.empty? ? args.pop : []
        spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)

        spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
        spout.instance_exec(&spout_block) if block_given?
        self.components << spout
      end

      # def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)
      def self.bolt(bolt_class, *args, &bolt_block)
        set_topology_class!
        options = args.last.is_a?(Hash) ? args.pop : {}
        contructor_args = !args.empty? ? args.pop : []
        bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)

        bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
        raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
        bolt.instance_exec(&bolt_block)
        self.components << bolt
      end

      def self.configure(name = nil, &configure_block)
        set_topology_class!
        @topology_name = name.to_s if name
        @configure_block = configure_block if block_given?
      end

      def self.submit_options(&submit_options_block)
        @submit_options_block = submit_options_block if block_given?
      end

      def self.on_submit(method_name = nil, &submit_block)
        @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
      end

      def self.build_topology
        resolve_ids!(components)

        builder = TopologyBuilder.new
        spouts.each do |spout|
          declarer = builder.setSpout(spout.id, spout.new_instance, spout.parallelism.to_java)
          declarer.addConfigurations(spout.config)
        end
        bolts.each do |bolt|
          declarer = builder.setBolt(bolt.id, bolt.new_instance, bolt.parallelism.to_java)
          declarer.addConfigurations(bolt.config)
          bolt.define_grouping(declarer)
        end
        builder.createTopology
      end

      def start(env)
        topology = self.class.build_topology

        # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
        defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}

        configurator = Configurator.new(defaults)
        configurator.instance_exec(env, &self.class.configure_block)

        submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter

        if self.class.submit_options_block && env != :local
          submit_options = SubmitOptions.new
          submit_options.instance_exec(env, &self.class.submit_options_block)

          submitter.submitTopology(self.class.topology_name, configurator.config, topology, submit_options)
        else
          submitter.submitTopology(self.class.topology_name, configurator.config, topology)
        end

        instance_exec(env, &self.class.submit_block)
      end

      private

      # this is a quirk to figure out the topology class at load time when the topology file
      # is required in the TopologyLauncher. Since we want to make the "configure" DSL statement
      # optional we can hook into any/all the other DSL statements that will be called at load time
      # and set it there. This is somewhat inelegant but it works.
      def self.set_topology_class!
        Configuration.topology_class = self
      end

      def self.resolve_ids!(components)
        # verify duplicate implicit ids
        ids = components.map(&:id)
        components.reverse.each do |component|
          raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1
          # verify source_id references
          if component.respond_to?(:sources)
            component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)}
          end
        end
      end

      def self.spouts
        self.components.select{|c| c.is_a?(SpoutDefinition)}
      end

      def self.bolts
        self.components.select{|c| c.is_a?(BoltDefinition)}
      end

      def self.components
        @components ||= []
      end

      def self.topology_name
        @topology_name ||= self.underscore(self.name)
      end

      def self.configure_block
        @configure_block ||= lambda{|env|}
      end

      def self.submit_options_block
        @submit_options_block
      end

      def self.submit_block
        @submit_block ||= lambda{|env|}
      end

      def self.underscore(camel_case)
        camel_case.to_s.split('::').last.gsub(/(.)([A-Z])/,'\1_\2').downcase!
      end
    end
  end

  # for backward compatibility
  SimpleTopology = DSL::Topology

end