jnidzwetzki/bboxdb

View on GitHub
bboxdb-server/src/main/java/org/bboxdb/network/client/future/client/helper/AbstractTheadedListFutureIterator.java

Summary

Maintainability
B
4 hrs
Test Coverage
/*******************************************************************************
 *
 *    Copyright (C) 2015-2022 the BBoxDB project
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 *
 *******************************************************************************/
package org.bboxdb.network.client.future.client.helper;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.bboxdb.network.client.BBoxDBClient;
import org.bboxdb.network.client.connection.BBoxDBConnection;
import org.bboxdb.network.client.future.client.AbstractListFuture;
import org.bboxdb.storage.entity.PagedTransferableEntity;
import org.bboxdb.storage.util.CloseableIterator;
import org.bboxdb.storage.util.TimeBasedEntityDuplicateTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTheadedListFutureIterator<T extends PagedTransferableEntity> implements CloseableIterator<T>{

    /**
     * The size of the transfer queue
     */
    protected final static int QUEUE_SIZE = 25;

    /**
     * The transfer queue
     */
    protected final BlockingQueue<T> tupleQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);

    /**
     * The amount of seen terminals, the iterator is exhausted,
     * when all producers are terminated and the queue is empty
     */
    protected int seenTerminals = 0;

    /**
     * The terminal (or poison) element
     */
    protected final T QUEUE_TERMINAL = buildQueueTerminal();

    /**
     * The next tuple for the next operation
     */
    protected T nextTuple = null;

    /**
     * The number of results, that needs to be queried
     */
    protected final int futuresToQuery;

    /**
     * The instance to iterate
     *
     * @param abstractListFuture
     */
    protected final AbstractListFuture<T> abstractListFuture;

    /**
     * The executor pool
     */
    protected final ExecutorService executor = Executors.newCachedThreadPool();

    /**
     * The tuple duplicate remover
     */
    protected final TimeBasedEntityDuplicateTracker tupleDuplicateRemover = new TimeBasedEntityDuplicateTracker();

    /**
     * The Logger
     */
    private final static Logger logger = LoggerFactory.getLogger(ThreadedTupleListFutureIterator.class);


    public AbstractTheadedListFutureIterator(final AbstractListFuture<T> abstractListFuture) {
        this.abstractListFuture = abstractListFuture;
        this.futuresToQuery = abstractListFuture.getNumberOfResultObjects();
        
        logger.debug("Started new AbstractTheadedListFutureIterator for {} result futures", futuresToQuery);

        for(int i = 0; i < abstractListFuture.getNumberOfResultObjects(); i++) {
            setupProducer(i);
        }
    }

    /**
     * Build the queue terminal
     * @return
     */
    protected abstract T buildQueueTerminal();

    /**
     * Setup the worker that fetches the data from the futures
     */
    public void setupProducer(final int resultId) {

        logger.debug("Starting producer for {}", resultId);

        final Runnable producer = new Runnable() {

            @Override
            public void run() {

                try {
                    final List<T> tupleList = abstractListFuture.get(resultId);

                    addTupleListToQueue(tupleList);

                    if(! abstractListFuture.isCompleteResult(resultId)) {
                        handleAdditionalPages();
                    }

                } catch (ExecutionException e) {
                    logger.warn("Got exception while writing data to queue", e);
                } catch (InterruptedException e) {
                    logger.warn("Got exception while writing data to queue", e);
                    Thread.currentThread().interrupt();
                } finally {
                    addTerminalNE();
                    logger.debug("Producer {} is done", resultId);
                }
            }

            /**
             * Request and add the additional pages to the queue
             * @throws ExecutionException
             * @throws InterruptedException
             */
            @SuppressWarnings("unchecked")
            protected void handleAdditionalPages() throws InterruptedException, ExecutionException {

                final BBoxDBConnection bboxdbConnection = abstractListFuture.getConnection(resultId);

                if(bboxdbConnection == null) {
                    logger.error("Unable to get connection for paging: {}", resultId);
                    return;
                }

                final short queryRequestId = abstractListFuture.getRequestId(resultId);
                final BBoxDBClient bbBoxDBClient = bboxdbConnection.getBboxDBClient();

                AbstractListFuture<T> nextPage = null;
                do {
                     if(logger.isDebugEnabled()) {
                         logger.debug("Requesting next page for {}", queryRequestId);
                     }
                     
                     nextPage = (AbstractListFuture<T>) bbBoxDBClient.getNextPage(queryRequestId);

                     nextPage.waitForCompletion();

                     if(nextPage.isFailed()) {
                         logger.error("Requesting next page failed! Query result is incomplete: {}", nextPage.getAllMessages());
                         return;
                     }

                     // Query is send to one server, so the number of
                     // result objects should be 1
                     if(nextPage.getNumberOfResultObjects() != 1) {
                         logger.error("Got a non expected number of result objects {}", nextPage.getNumberOfResultObjects());
                     }

                     final List<T> tuples = nextPage.get(0);
                     
                     if(logger.isDebugEnabled()) {
                         logger.debug("Next page for {} contains {} tuples", queryRequestId, tuples.size());
                     }
                     
                     addTupleListToQueue(tuples);

                } while(! nextPage.isCompleteResult(0));
            }

            /**
             * Add the tuple list into the queue
             * @param tupleList
             * @throws InterruptedException
             */
            protected void addTupleListToQueue(final List<T> tupleList) throws InterruptedException {
                
                logger.debug("Added {} result tuples to queue", tupleList.size());
                
                for(final T element : tupleList) {
                    tupleQueue.put(element);
                }
            }

            /**
             * Add the terminal to the queue
             */
            protected void addTerminalNE() {
                try {
                    tupleQueue.put(QUEUE_TERMINAL);
                } catch (InterruptedException e) {
                    // Got the interrupted exception while addint the
                    // terminal, ignoring
                }
                
                abstractListFuture.runShutdownCallbacks();
            }
        };

        executor.submit(producer);
    }

    @Override
    public boolean hasNext() {

        if(nextTuple != null) {
            logger.warn("Last tuple was not requested, did you call next before?");
            nextTuple = null;
        }

        while(nextTuple == null) {

            // All worker are done
            if(seenTerminals == futuresToQuery) {
                return (nextTuple != null);
            }

            try {
                // Wait until element is available
                nextTuple = tupleQueue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }

            if(nextTuple == QUEUE_TERMINAL) {
                seenTerminals++;
                nextTuple = null;
                continue;
            }

            // Tuple was received from another instance
            if(tupleDuplicateRemover.isElementAlreadySeen(nextTuple)) {
                nextTuple = null;
            }
        }

        return true;
    }

    @Override
    public T next() {
        if(nextTuple == null) {
            throw new IllegalStateException("Tuple is null, did you called hasNext before?");
        }

        final T resultTuple = nextTuple;
        nextTuple = null;
        return resultTuple;
    }

    @Override
    public void close() throws Exception {
        logger.trace("Close called on iterator");
        executor.shutdown();
    }

    @Override
    protected void finalize() throws Throwable {
        close();
        super.finalize();
    }
}