
View on GitHub


1 day
Test Coverage
 *    Copyright (C) 2015-2022 the BBoxDB project
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *      http://www.apache.org/licenses/LICENSE-2.0
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
package org.bboxdb.network.client.future.client;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.bboxdb.network.client.BBoxDBClient;
import org.bboxdb.network.client.connection.BBoxDBConnection;
import org.bboxdb.network.client.future.network.NetworkOperationFuture;
import org.bboxdb.network.packets.NetworkRequestPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OperationFutureImpl<T> implements OperationFuture, FutureErrorCallback {

     * The tuple send delayer
    private final static ScheduledExecutorService scheduler;

    static {
        scheduler = Executors.newScheduledThreadPool(1);

     * The futures
    protected List<NetworkOperationFuture> futures;

     * The default retry policy
    private FutureRetryPolicy retryPolicy;

     * The ready latch
    protected final CountDownLatch readyLatch = new CountDownLatch(1);

     * The retry counter
     * @param future
    private int globalRetryCounter = 0;

     * The future supplier
    private Supplier<List<NetworkOperationFuture>> futureSupplier;
     * The success callbacks
    protected final List<Consumer<OperationFuture>> successCallbacks;
     * Are the success callbacks already run?
    protected boolean successCallbacksRun = false;

     * The failure callbacks
    protected final List<Consumer<OperationFuture>> failureCallbacks;
     * Are the failure callbacks already run?
    protected boolean failureCallbacksRun = false;
     * The shutdown callbacks
    protected final List<Consumer<OperationFuture>> shutdownCallbacks = new ArrayList<>();
     * The error history of the future
    private final StringBuilder errorHistory = new StringBuilder();
     * The Logger
    private final static Logger logger = LoggerFactory.getLogger(OperationFutureImpl.class);

    public OperationFutureImpl(final Supplier<List<NetworkOperationFuture>> futures) {
        this(futures, FutureRetryPolicy.RETRY_POLICY_ALL_FUTURES);

    public OperationFutureImpl(final Supplier<List<NetworkOperationFuture>> futureSupplier,
            final FutureRetryPolicy retryPolicy) {

        this(futureSupplier, retryPolicy, new ArrayList<>(), new ArrayList<>());
    public OperationFutureImpl(final Supplier<List<NetworkOperationFuture>> futureSupplier,
            final FutureRetryPolicy retryPolicy, List<Consumer<OperationFuture>> successCallbacks,
            List<Consumer<OperationFuture>> failureCallbacks) {

        this.futureSupplier = futureSupplier;
        this.retryPolicy = retryPolicy;
        this.successCallbacks = successCallbacks;
        this.failureCallbacks = failureCallbacks;


     * Execute the network operation futures
    public void execute() {

        // Get futures from supplier
        futures = futureSupplier.get();

        // Set callbacks
        futures.forEach(f -> f.setErrorCallback(this));
        futures.forEach(f -> f.setDoneCallback((c) -> handleNetworkFutureDone(c)));

        // Execute
        futures.forEach(f -> f.execute());

        // Maybe we have a empty future list, so no callback is executed. Let's
        // check if we are already done

     * Handle a future done
     * @param c 
     * @param future
    private void handleNetworkFutureDone(final NetworkOperationFuture networkOperationFuture) {
        final boolean allDone = futures.stream().allMatch(f -> f.isDone());

        if(logger.isDebugEnabled()) {
            if(networkOperationFuture != null) {
                logger.debug("Got callback from {}", networkOperationFuture.getConnection().getConnectionName());
            final long doneFutures = futures.stream().filter(f -> f.isDone()).count();
            logger.debug("Handle success, all futures done {} (done={} of={})", allDone, 
                    doneFutures, futures.size());
        // If some futures are failed, cancel the successful ones
        if (isFailed()) {

        if (allDone) {
            if(networkOperationFuture != null && ! networkOperationFuture.isFailed()) {

     * Run the complete callbacks
    public void runSuccessCallbacks() {
        successCallbacksRun = true;
        successCallbacks.forEach(c -> c.accept(this));
     * Run the failure callbacks
    public void runFailureCallbacks() {
        failureCallbacksRun = true;
        failureCallbacks.forEach(c -> c.accept(this));
     * Run the complete callbacks
    public void runShutdownCallbacks() {
        shutdownCallbacks.forEach(c -> c.accept(this));
     * Add the consumer to the list of success callbacks
     * @param consumer
    public void addSuccessCallbackConsumer(final Consumer<OperationFuture> consumer) {
        // Run late callback immediately
        if(successCallbacksRun) {
     * Add the consumer to the list of failure callbacks
     * @param consumer
    public void addFailureCallbackConsumer(final Consumer<OperationFuture> consumer) {
        // Run late callback immediately
        if(failureCallbacksRun) {
     * Add the future to the lust of shutdown futures
     * @param consumer
    public void addShutdownCallbackConsumer(final Consumer<OperationFuture> consumer) {

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#getRequestId(int)
    public short getRequestId(final int resultId) {

        return futures.get(resultId).getRequestId();

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#getMessage(int)
    public String getMessage(final int resultId) {

        return futures.get(resultId).getMessage();

    public BBoxDBConnection getConnection(final int resultId) {

        return futures.get(resultId).getConnection();

    public boolean isCompleteResult(int resultId) {

        return futures.get(resultId).isCompleteResult();

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#getAllMessages()
    public String getAllMessages() {
        return futures.stream().filter(f -> f.getMessage() != null).map(f -> f.getMessageWithConnectionName())
                .collect(Collectors.joining(",", "[", "]"));

     * Throw an exception when the result id is unknown
     * @param resultId
    protected void checkFutureSize(final int resultId) {
        if (resultId > futures.size()) {
            throw new IllegalArgumentException(
                    "Unable to access future with id: " + resultId + "(total " + futures.size() + ")");

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#isFailed()
    public boolean isFailed() {
        return futures.stream().anyMatch(f -> f.isFailed());

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#isDone()
    public boolean isDone() {
        return readyLatch.getCount() == 0;

     * (non-Javadoc)
     * @see
     * org.bboxdb.network.client.future.OperationFuture#getNumberOfResultObjets()
    public int getNumberOfResultObjects() {
        return futures.size();

     * Get the result of the future
     * @return
    public T get(final int resultId) throws InterruptedException {

        return (T) futures.get(resultId).get(true);

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#waitForAll()
    public void waitForCompletion() throws InterruptedException {

     * (non-Javadoc)
     * @see org.bboxdb.network.client.future.OperationFuture#waitForAll()
    public void waitForCompletion(final long timeout, final TimeUnit unit)
            throws InterruptedException, TimeoutException {

        readyLatch.await(timeout, unit);

    public long getCompletionTime(final TimeUnit timeUnit) {
        return futures.stream()
                .filter(f -> (f.isDone() || f.isFailed()))
                .mapToLong(f -> f.getCompletionTime(timeUnit))

     * Set the retry policy
     * @param retry
    public void setRetryPolicy(final FutureRetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;

     * Cancel all old queries
    public void cancelAllFutures() {
        futures.forEach(f -> cancelOldFuture(f));

     * Cancel the old future
     * @param future
    private void cancelOldFuture(final NetworkOperationFuture future) {
        final NetworkRequestPacket transmittedPackage = future.getTransmittedPackage();

        logger.debug("Canceling future [seq={}, connection={}, state={}]", future.getRequestId(),
                future.getConnection().getConnectionName(), future.getConnection().getConnectionState());

        if (transmittedPackage == null) {

        if (!transmittedPackage.needsToBeCanceled()) {

        // Only successful futures needs to be canceled
        if (future.isFailed()) {

        final BBoxDBConnection connection = future.getConnection();
        final BBoxDBClient bboxDBClient = connection.getBboxDBClient();

     * Handle the error callback
     * Method is synchronized to prevent mixed callback handling from multiple
     * connections
    public synchronized boolean handleError(final NetworkOperationFuture future) {

        if (logger.isDebugEnabled()) {
            logger.debug("Got failed future back [policy={}, seq={}, connection={}, state={}]", retryPolicy,
                    future.getRequestId(), future.getConnection().getConnectionName(),

        if (!futures.contains(future)) {
            logger.debug("Future is unknown, all network futures might be re-executed. Ignoring callback");
            return true;

        if (!future.getConnection().isConnected()) {
            logger.debug("Unable to retry future, because connection failed [connection={}, state={}]",
                    future.getConnection().getConnectionName(), future.getConnection().getConnectionState());
            return false;

        if (retryPolicy == FutureRetryPolicy.RETRY_POLICY_NONE) {
            return false;

        if (retryPolicy == FutureRetryPolicy.RETRY_POLICY_ONE_FUTURE) {
            return handleOneFutureRetry(future);

        if (retryPolicy == FutureRetryPolicy.RETRY_POLICY_ALL_FUTURES) {
            return handleAllFutureRetry(future);

        throw new RuntimeException("Unknown retry policy: " + retryPolicy);

     * Handle all futures retry
     * @param future
     * @return
    private boolean handleOneFutureRetry(final NetworkOperationFuture future) {
        if (future.getExecutions() > future.getTotalRetries()) {
            return false;

        final Runnable futureTask = () -> {
            // Already finished
            if(readyLatch.getCount() == 0) {

        final int delay = (int) (100 * future.getExecutions());
        errorHistory.append("Schedule in " + delay);
        scheduler.schedule(futureTask, delay, TimeUnit.MILLISECONDS);
        logger.debug("Reschedule event for type [delay={}, type={}] in handleOneFutureRetry", delay, future.getClass().toString());

        return true;

     * Handle one future retry
     * @return
    private boolean handleAllFutureRetry(final NetworkOperationFuture future) {
        if (globalRetryCounter >= futures.get(0).getTotalRetries()) {
            return false;

        if (futureSupplier == null) {
            throw new RuntimeException("Error policy is RETRY_ALL_FUTURES and supplier is null");


        final Runnable futureTask = () -> {
            // Already finished
            if(readyLatch.getCount() == 0) {

        final int delay = (int) (100 * globalRetryCounter);
        errorHistory.append("Schedule in " + delay);
        scheduler.schedule(futureTask, delay, TimeUnit.MILLISECONDS);

        logger.debug("Reschedule event for type [delay={}, type={}] in handleAllFutureRetry", delay, future.getClass().toString());

        return true;

     * Get the number of needed executions
     * @return
    public int getNeededExecutions() {
        return globalRetryCounter + 1;

    public Set<Long> getAffectedRegionIDs() {
        final Set<Long> regions = new HashSet<>();
        for(final NetworkOperationFuture future : futures) {
        return regions;

    public String getErrorLog() {
        return errorHistory.toString();