src/main/java/rx/plugins/RxJavaPlugins.java
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
/**
* Registry for plugin implementations that allows global override and handles the retrieval of correct
* implementation based on order of precedence:
* <ol>
* <li>plugin registered globally via {@code register} methods in this class</li>
* <li>plugin registered and retrieved using {@link java.lang.System#getProperty(String)} (see get methods for
* property names)</li>
* <li>default implementation</li>
* </ol>
* <p>In addition to the {@code rxjava.plugin.[simple class name].implementation} system properties,
* you can define two system property:<br>
* <pre><code>
* rxjava.plugin.[index].class}
* rxjava.plugin.[index].impl}
* </code></pre>
*
* Where the {@code .class} property contains the simple class name from above and the {@code .impl}
* contains the fully qualified name of the implementation class. The {@code [index]} can be
* any short string or number of your choosing. For example, you can now define a custom
* {@code RxJavaErrorHandler} via two system property:
* <pre><code>
* rxjava.plugin.1.class=RxJavaErrorHandler
* rxjava.plugin.1.impl=some.package.MyRxJavaErrorHandler
* </code></pre>
*
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">RxJava Wiki: Plugins</a>
*
* Use the {@link RxJavaHooks} features instead which let's you change individual
* handlers at runtime.
*/
public class RxJavaPlugins {
private final static RxJavaPlugins INSTANCE = new RxJavaPlugins();
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
private final AtomicReference<RxJavaSingleExecutionHook> singleExecutionHook = new AtomicReference<RxJavaSingleExecutionHook>();
private final AtomicReference<RxJavaCompletableExecutionHook> completableExecutionHook = new AtomicReference<RxJavaCompletableExecutionHook>();
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();
static final RxJavaErrorHandler DEFAULT_ERROR_HANDLER = new RxJavaErrorHandler() {
};
/**
* Retrieves the single {@code RxJavaPlugins} instance.
*
* @return the single {@code RxJavaPlugins} instance
*
* @deprecated use the static methods of {@link RxJavaHooks}.
*/
@Deprecated
public static RxJavaPlugins getInstance() {
return INSTANCE;
}
/* package accessible for unit tests */RxJavaPlugins() {
// nothing to do
}
/**
* Reset {@code RxJavaPlugins} instance
* <p>
* This API is experimental. Resetting the plugins is dangerous
* during application runtime and also bad code could invoke it in
* the middle of an application life-cycle and really break applications
* if not used cautiously. For more detailed discussions:
* @see <a href="https://github.com/ReactiveX/RxJava/issues/2297">Make RxJavaPlugins.reset() public</a>
* @since 1.3
*/
public void reset() {
INSTANCE.errorHandler.set(null);
INSTANCE.observableExecutionHook.set(null);
INSTANCE.singleExecutionHook.set(null);
INSTANCE.completableExecutionHook.set(null);
INSTANCE.schedulersHook.set(null);
}
/**
* Retrieves an instance of {@link RxJavaErrorHandler} to use based on order of precedence as defined in
* {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerErrorHandler(RxJavaErrorHandler)} or by setting the
* property {@code rxjava.plugin.RxJavaErrorHandler.implementation} with the full class name to load.
* @return {@link RxJavaErrorHandler} implementation to use
*/
public RxJavaErrorHandler getErrorHandler() {
if (errorHandler.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaErrorHandler.class, getSystemPropertiesSafe());
if (impl == null) {
// nothing set via properties so initialize with default
errorHandler.compareAndSet(null, DEFAULT_ERROR_HANDLER);
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
errorHandler.compareAndSet(null, (RxJavaErrorHandler) impl);
}
}
return errorHandler.get();
}
/**
* Registers an {@link RxJavaErrorHandler} implementation as a global override of any injected or default
* implementations.
*
* @param impl
* {@link RxJavaErrorHandler} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void registerErrorHandler(RxJavaErrorHandler impl) {
if (!errorHandler.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + errorHandler.get());
}
}
/**
* Retrieves the instance of {@link RxJavaObservableExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerObservableExecutionHook(RxJavaObservableExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaObservableExecutionHook.implementation} with the
* full class name to load.
*
* @return {@link RxJavaObservableExecutionHook} implementation to use
*/
public RxJavaObservableExecutionHook getObservableExecutionHook() {
if (observableExecutionHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, getSystemPropertiesSafe());
if (impl == null) {
// nothing set via properties so initialize with default
observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
}
}
return observableExecutionHook.get();
}
/**
* Register an {@link RxJavaObservableExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaObservableExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) {
if (!observableExecutionHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook.get());
}
}
/**
* Retrieves the instance of {@link RxJavaSingleExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerSingleExecutionHook(RxJavaSingleExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaSingleExecutionHook.implementation} with the
* full class name to load.
*
* @return {@link RxJavaSingleExecutionHook} implementation to use
*/
public RxJavaSingleExecutionHook getSingleExecutionHook() {
if (singleExecutionHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaSingleExecutionHook.class, getSystemPropertiesSafe());
if (impl == null) {
// nothing set via properties so initialize with default
singleExecutionHook.compareAndSet(null, RxJavaSingleExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
singleExecutionHook.compareAndSet(null, (RxJavaSingleExecutionHook) impl);
}
}
return singleExecutionHook.get();
}
/**
* Register an {@link RxJavaSingleExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaSingleExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) {
if (!singleExecutionHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
}
}
/**
* Retrieves the instance of {@link RxJavaCompletableExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerCompletableExecutionHook(RxJavaCompletableExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaCompletableExecutionHook.implementation} with the
* full class name to load.
*
* @return {@link RxJavaCompletableExecutionHook} implementation to use
* @since 1.3
*/
public RxJavaCompletableExecutionHook getCompletableExecutionHook() {
if (completableExecutionHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaCompletableExecutionHook.class, getSystemPropertiesSafe());
if (impl == null) {
// nothing set via properties so initialize with default
completableExecutionHook.compareAndSet(null, new RxJavaCompletableExecutionHook() { });
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
completableExecutionHook.compareAndSet(null, (RxJavaCompletableExecutionHook) impl);
}
}
return completableExecutionHook.get();
}
/**
* Register an {@link RxJavaCompletableExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaCompletableExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
* @since 1.3
*/
public void registerCompletableExecutionHook(RxJavaCompletableExecutionHook impl) {
if (!completableExecutionHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
}
}
/**
* A security manager may prevent accessing the System properties entirely,
* therefore, the SecurityException is turned into an empty properties.
* @return the Properties to use for looking up settings
*/
/* test */ static Properties getSystemPropertiesSafe() {
try {
return System.getProperties();
} catch (SecurityException ex) {
return new Properties();
}
}
/* test */ static Object getPluginImplementationViaProperty(Class<?> pluginClass, Properties propsIn) {
// Make a defensive clone because traversal may fail with ConcurrentModificationException
// if the properties get changed by something outside RxJava.
Properties props = (Properties)propsIn.clone();
final String classSimpleName = pluginClass.getSimpleName();
/*
* Check system properties for plugin class.
* <p>
* This will only happen during system startup thus it's okay to use the synchronized
* System.getProperties as it will never get called in normal operations.
*/
String pluginPrefix = "rxjava.plugin.";
String defaultKey = pluginPrefix + classSimpleName + ".implementation";
String implementingClass = props.getProperty(defaultKey);
if (implementingClass == null) {
String classSuffix = ".class";
String implSuffix = ".impl";
try {
for (Map.Entry<Object, Object> e : props.entrySet()) {
String key = e.getKey().toString();
if (key.startsWith(pluginPrefix) && key.endsWith(classSuffix)) {
String value = e.getValue().toString();
if (classSimpleName.equals(value)) {
String index = key.substring(0, key.length() - classSuffix.length()).substring(pluginPrefix.length());
String implKey = pluginPrefix + index + implSuffix;
implementingClass = props.getProperty(implKey);
if (implementingClass == null) {
throw new IllegalStateException("Implementing class declaration for " + classSimpleName + " missing: " + implKey);
}
break;
}
}
}
} catch (SecurityException ex) {
// https://github.com/ReactiveX/RxJava/issues/5819
// We don't seem to have access to all properties.
// At least print the exception to the console.
ex.printStackTrace();
}
}
if (implementingClass != null) {
try {
Class<?> cls = Class.forName(implementingClass);
// narrow the scope (cast) to the type we're expecting
cls = cls.asSubclass(pluginClass);
return cls.newInstance();
} catch (ClassCastException e) {
throw new IllegalStateException(classSimpleName + " implementation is not an instance of " + classSimpleName + ": " + implementingClass, e);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(classSimpleName + " implementation class not found: " + implementingClass, e);
} catch (InstantiationException e) {
throw new IllegalStateException(classSimpleName + " implementation not able to be instantiated: " + implementingClass, e);
} catch (IllegalAccessException e) {
throw new IllegalStateException(classSimpleName + " implementation not able to be accessed: " + implementingClass, e);
}
}
return null;
}
/**
* Retrieves the instance of {@link RxJavaSchedulersHook} to use based on order of precedence as defined
* in the {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerSchedulersHook(RxJavaSchedulersHook)} or by setting
* the property {@code rxjava.plugin.RxJavaSchedulersHook.implementation} with the full class name to
* load.
*
* @return the {@link RxJavaSchedulersHook} implementation in use
*/
public RxJavaSchedulersHook getSchedulersHook() {
if (schedulersHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaSchedulersHook.class, getSystemPropertiesSafe());
if (impl == null) {
// nothing set via properties so initialize with default
schedulersHook.compareAndSet(null, RxJavaSchedulersHook.getDefaultInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
schedulersHook.compareAndSet(null, (RxJavaSchedulersHook) impl);
}
}
return schedulersHook.get();
}
/**
* Registers an {@link RxJavaSchedulersHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaSchedulersHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void registerSchedulersHook(RxJavaSchedulersHook impl) {
if (!schedulersHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + schedulersHook.get());
}
}
}