jnidzwetzki/bboxdb

View on GitHub
bboxdb-tools/src/main/java/org/bboxdb/tools/gui/views/query/ContinuousQueryRunable.java

Summary

Maintainability
B
5 hrs
Test Coverage
/*******************************************************************************
 *
 *    Copyright (C) 2015-2022 the BBoxDB project
 *  
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License. 
 *    
 *******************************************************************************/
package org.bboxdb.tools.gui.views.query;

import java.awt.Color;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

import org.bboxdb.misc.BBoxDBException;
import org.bboxdb.network.client.BBoxDBCluster;
import org.bboxdb.network.client.ContinuousQueryState;
import org.bboxdb.network.client.future.client.JoinedTupleListFuture;
import org.bboxdb.query.ContinuousQueryPlan;
import org.bboxdb.storage.entity.IdleQueryStateRemovedTuple;
import org.bboxdb.storage.entity.InvalidationTuple;
import org.bboxdb.storage.entity.MultiTuple;
import org.bboxdb.storage.entity.Tuple;
import org.bboxdb.storage.entity.TupleStoreName;
import org.bboxdb.storage.entity.WatermarkTuple;
import org.bboxdb.storage.sstable.SSTableConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContinuousQueryRunable extends AbstractContinuousQueryRunable {
    
    /**
     * The color of the results
     */
    private final List<Color> colors;
    
    /**
     * The query result
     */
    private JoinedTupleListFuture queryResult;
    
    /**
     * The seen watermarks
     */
    private final Set<Long> seenWatermarks;
    
    /**
     * The waiting queue if watermarks are active
     */
    private final TransferQueue<MultiTuple> watermarkQueue;
    
    /**
     * The logger
     */
    private final static Logger logger = LoggerFactory.getLogger(ContinuousQueryRunable.class);

    public ContinuousQueryRunable(final List<Color> colors, final ContinuousQueryPlan qp, 
            final BBoxDBCluster connection, final ElementOverlayPainter painter) {
        
        super(qp, connection, painter);
        this.colors = colors;
        this.seenWatermarks = new HashSet<>();
        this.watermarkQueue = new LinkedTransferQueue<>();
    }
    
    @Override
    protected void runThread() throws Exception {
    
        logger.info("New worker thread for a continuous query has started");
        queryResult = connection.queryContinuous(qp);
    
        try {
            queryResult.waitForCompletion();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    
        for(final MultiTuple joinedTuple : queryResult) {
            if(queryResult.isFailed()) {
                logger.error("Got an error" + queryResult.getAllMessages());
                return;
            }
    
            if(Thread.currentThread().isInterrupted()) {
                return;
            }
            
            final Tuple firstTuple = joinedTuple.getTuple(0);
            
            if(firstTuple instanceof WatermarkTuple) {
                handleWatermark(firstTuple);
            } else {
                // Update stale tuples only if we don't receive watermarks
                // otherwise, the stale tuples are removed when all watermarks are present
                if(! qp.isReceiveWatermarks()) {
                    
                    if (firstTuple instanceof InvalidationTuple || firstTuple instanceof IdleQueryStateRemovedTuple) {
                        removeTupleFromView(joinedTuple);
                    } else {
                        updateTupleOnGui(joinedTuple, colors, true);
                        removeStaleTupleIfNeeded();
                    }
                    
                } else {
                    // When watermarks active, queue elements
                    final boolean queueResult = watermarkQueue.offer(joinedTuple);
                    
                    if(! queueResult) {
                        logger.warn("Unable to queue tuple {}", joinedTuple);
                    }
                }
            }
        }
    }
    
    /**
     * Handle the watermark tuple
     * @param firstTuple
     */
    private void handleWatermark(final Tuple tuple) {
        
        logger.info("Handle watermark {}", tuple);
        
        final String tupleStoreString = tuple.getKey().replace(SSTableConst.WATERMARK_KEY + "_", "");
        final TupleStoreName tupleStore = new TupleStoreName(tupleStoreString);
        final long regionId = tupleStore.getRegionId().getAsLong();
        seenWatermarks.add(regionId);
        
        final String queryUUID = qp.getQueryUUID();
        final Optional<ContinuousQueryState> queryStateOptional = connection.getContinousQueryState(queryUUID);

        if(! queryStateOptional.isPresent()) {
            logger.error("Query state is not present, unable to handle watermark");
            return;
        }
        
        final ContinuousQueryState queryState = queryStateOptional.get();
        
        // All watermarks are present
        if(seenWatermarks.size() == queryState.getRegisteredRegions().size()) {
            handleWatermarkDone();
        }
    }

    /**
     * Handle the completion of the watermark
     */
    private void handleWatermarkDone() {
        logger.debug("Watermark complete: {}", seenWatermarks);
        
        // Paint pending elements
        final List<MultiTuple> queue = new LinkedList<>();
        watermarkQueue.drainTo(queue);
        logger.info("Processing {} waiting elements", queue.size());
        
        for(final MultiTuple multiTuple : queue) {
            
            final Tuple firstTuple = multiTuple.getTuple(0);

            if (firstTuple instanceof InvalidationTuple) {
                removeTupleFromView(multiTuple);
            } else {
                updateTupleOnGui(multiTuple, colors, false);
            }
        }
        
        seenWatermarks.clear();
        removeStaleTupleIfNeeded();
        painter.repaintAll();
    }

    @Override
    protected void endHook() { 
        
        if(queryResult != null) {
            logger.info("Canceling continuous query");
            
            // Clear interrupted flag, to be able to cancel the query on server
            boolean interruptedFlag = Thread.interrupted();
            
            try {
                logger.info("Canceling continous query {}", qp.getQueryUUID());
                connection.cancelContinousQuery(qp.getQueryUUID());
            } catch (BBoxDBException e) {
                logger.error("Got exception", e);
            } catch (InterruptedException e) {
                logger.info("Got interrupted exception");
                interruptedFlag = true;
            } finally {
                // Restore interrupted flag
                if(interruptedFlag) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        logger.info("Worker for continuous query exited");
    }
}