ext/concurrent-ruby/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java
package com.concurrent_ruby.ext;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
public class JavaSemaphoreLibrary {
public void load(Ruby runtime, boolean wrap) throws IOException {
RubyModule concurrentMod = runtime.defineModule("Concurrent");
RubyClass atomicCls = concurrentMod.defineClassUnder("JavaSemaphore", runtime.getObject(), JRUBYREFERENCE_ALLOCATOR);
atomicCls.defineAnnotatedMethods(JavaSemaphore.class);
}
private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JavaSemaphore(runtime, klazz);
}
};
@JRubyClass(name = "JavaSemaphore", parent = "Object")
public static class JavaSemaphore extends RubyObject {
private JRubySemaphore semaphore;
public JavaSemaphore(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public IRubyObject initialize(ThreadContext context, IRubyObject value) {
this.semaphore = new JRubySemaphore(rubyFixnumInt(value, "count"));
return context.nil;
}
@JRubyMethod
public IRubyObject acquire(ThreadContext context, final Block block) throws InterruptedException {
return this.acquire(context, 1, block);
}
@JRubyMethod
public IRubyObject acquire(ThreadContext context, IRubyObject permits, final Block block) throws InterruptedException {
return this.acquire(context, rubyFixnumToPositiveInt(permits, "permits"), block);
}
@JRubyMethod(name = "available_permits")
public IRubyObject availablePermits(ThreadContext context) {
return getRuntime().newFixnum(this.semaphore.availablePermits());
}
@JRubyMethod(name = "drain_permits")
public IRubyObject drainPermits(ThreadContext context) {
return getRuntime().newFixnum(this.semaphore.drainPermits());
}
@JRubyMethod(name = "try_acquire")
public IRubyObject tryAcquire(ThreadContext context, final Block block) throws InterruptedException {
int permitsInt = 1;
boolean acquired = semaphore.tryAcquire(permitsInt);
return triedAcquire(context, permitsInt, acquired, block);
}
@JRubyMethod(name = "try_acquire")
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, final Block block) throws InterruptedException {
int permitsInt = rubyFixnumToPositiveInt(permits, "permits");
boolean acquired = semaphore.tryAcquire(permitsInt);
return triedAcquire(context, permitsInt, acquired, block);
}
@JRubyMethod(name = "try_acquire")
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout, final Block block) throws InterruptedException {
int permitsInt = rubyFixnumToPositiveInt(permits, "permits");
boolean acquired = semaphore.tryAcquire(
permitsInt,
rubyNumericToLong(timeout, "timeout"),
java.util.concurrent.TimeUnit.SECONDS
);
return triedAcquire(context, permitsInt, acquired, block);
}
@JRubyMethod
public IRubyObject release(ThreadContext context) {
this.semaphore.release(1);
return getRuntime().newBoolean(true);
}
@JRubyMethod
public IRubyObject release(ThreadContext context, IRubyObject permits) {
this.semaphore.release(rubyFixnumToPositiveInt(permits, "permits"));
return getRuntime().newBoolean(true);
}
@JRubyMethod(name = "reduce_permits")
public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) throws InterruptedException {
this.semaphore.publicReducePermits(rubyFixnumToNonNegativeInt(reduction, "reduction"));
return context.nil;
}
private IRubyObject acquire(ThreadContext context, int permits, final Block block) throws InterruptedException {
this.semaphore.acquire(permits);
if (!block.isGiven()) return context.nil;
try {
return block.yieldSpecific(context);
} finally {
this.semaphore.release(permits);
}
}
private IRubyObject triedAcquire(ThreadContext context, int permits, boolean acquired, final Block block) {
if (!block.isGiven()) return getRuntime().newBoolean(acquired);
if (!acquired) return context.nil;
try {
return block.yieldSpecific(context);
} finally {
this.semaphore.release(permits);
}
}
private int rubyFixnumInt(IRubyObject value, String paramName) {
if (value instanceof RubyFixnum) {
RubyFixnum fixNum = (RubyFixnum) value;
return (int) fixNum.getLongValue();
} else {
throw getRuntime().newArgumentError(paramName + " must be integer");
}
}
private int rubyFixnumToNonNegativeInt(IRubyObject value, String paramName) {
if (value instanceof RubyFixnum && ((RubyFixnum) value).getLongValue() >= 0) {
RubyFixnum fixNum = (RubyFixnum) value;
return (int) fixNum.getLongValue();
} else {
throw getRuntime().newArgumentError(paramName + " must be a non-negative integer");
}
}
private int rubyFixnumToPositiveInt(IRubyObject value, String paramName) {
if (value instanceof RubyFixnum && ((RubyFixnum) value).getLongValue() > 0) {
RubyFixnum fixNum = (RubyFixnum) value;
return (int) fixNum.getLongValue();
} else {
throw getRuntime().newArgumentError(paramName + " must be an integer greater than zero");
}
}
private long rubyNumericToLong(IRubyObject value, String paramName) {
if (value instanceof RubyNumeric && ((RubyNumeric) value).getDoubleValue() > 0) {
RubyNumeric fixNum = (RubyNumeric) value;
return fixNum.getLongValue();
} else {
throw getRuntime().newArgumentError(paramName + " must be a float greater than zero");
}
}
class JRubySemaphore extends Semaphore {
public JRubySemaphore(int permits) {
super(permits);
}
public JRubySemaphore(int permits, boolean value) {
super(permits, value);
}
public void publicReducePermits(int i) {
reducePermits(i);
}
}
}
}