colinsurprenant/redstorm

View on GitHub
src/main/redstorm/storm/jruby/JRubyTridentFunction.java

Summary

Maintainability
A
40 mins
Test Coverage
package redstorm.storm.jruby;

import storm.trident.tuple.TridentTuple;
import storm.trident.operation.TridentCollector;
import java.util.Map;
import storm.trident.operation.TridentOperationContext;
import storm.trident.operation.Function;

import org.jruby.Ruby;
import org.jruby.RubyObject;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.javasupport.JavaUtil;
import org.jruby.RubyModule;
import org.jruby.exceptions.RaiseException;

public class JRubyTridentFunction implements Function {
  private final String _realClassName;
  private final String _bootstrap;

  // transient to avoid serialization
  private transient IRubyObject _ruby_function;
  private transient Ruby __ruby__;

  public JRubyTridentFunction(final String baseClassPath, final String realClassName) {
    _realClassName = realClassName;
    _bootstrap = "require '" + baseClassPath + "'";
  }

  @Override
  public void execute(final TridentTuple tuple, final TridentCollector collector) {
    IRubyObject ruby_tuple = JavaUtil.convertJavaToRuby(__ruby__, tuple);
    IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector);
    Helpers.invoke(__ruby__.getCurrentContext(), _ruby_function, "execute", ruby_tuple, ruby_collector);
  }

  @Override
  public void cleanup() {
    Helpers.invoke(__ruby__.getCurrentContext(), _ruby_function, "cleanup");
  }

  @Override
  public void prepare(final Map conf, final TridentOperationContext context) {
    if(_ruby_function == null) {
      _ruby_function = initialize_ruby_function();
    }
    IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf);
    IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context);
    Helpers.invoke(__ruby__.getCurrentContext(), _ruby_function, "prepare", ruby_conf, ruby_context);
  }

  private IRubyObject initialize_ruby_function() {
    __ruby__ = Ruby.getGlobalRuntime();

    RubyModule ruby_class;
    try {
      ruby_class = __ruby__.getClassFromPath(_realClassName);
    }
    catch (RaiseException e) {
      // after deserialization we need to recreate ruby environment
      __ruby__.evalScriptlet(_bootstrap);
      ruby_class = __ruby__.getClassFromPath(_realClassName);
    }
    return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new");
  }
}