prasadtalasila/BITS-Darshini

View on GitHub
src/main/java/in/ac/bits/protocolanalyzer/persistence/repository/SaveRepository.java

Summary

Maintainability
A
2 hrs
Test Coverage
package in.ac.bits.protocolanalyzer.persistence.repository;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;

import lombok.extern.log4j.Log4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.stereotype.Component;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import in.ac.bits.protocolanalyzer.analyzer.event.BucketLimitEvent;
import in.ac.bits.protocolanalyzer.analyzer.event.EndAnalysisEvent;
import in.ac.bits.protocolanalyzer.analyzer.event.SaveRepoEndEvent;

@Component
@Scope("prototype")
@Log4j
public class SaveRepository implements Runnable {

    @Autowired
    private ElasticsearchTemplate template;

    private ConcurrentLinkedQueue<ArrayList<IndexQuery>> buckets;

    private boolean isRunning = false;

    private boolean analysisRunning = true;

    private EventBus eventBus;

    private int lowWaterMark;

    private boolean analysisOnly;

    public boolean isRunning() {
        return isRunning;
    }

    public void configure(EventBus eventBus) {
        buckets = new ConcurrentLinkedQueue<ArrayList<IndexQuery>>();
        this.eventBus = eventBus;
        this.eventBus.register(this);
        try {
            Context ctx = new InitialContext();
            Context env = (Context) ctx.lookup("java:comp/env");
            lowWaterMark = Integer.parseInt((String) env.lookup("lowWaterMark"));
            log.info("LOW WATER MARK READ FROM FILE IS: " + lowWaterMark);
        } catch (NamingException e) {
            log.info("EXCEPTION IN READING FROM CONFIG FILE");
            lowWaterMark = 3;
        }
        //Set the value of the analysisOnly
        try {
            Context ctx = new InitialContext();
            Context env = (Context) ctx.lookup("java:comp/env");
            if (((String) env.lookup("analysisOnly")).equals("true")) {
                analysisOnly = true;
            } else {
                analysisOnly = false;
            }
            log.info("Perform only analysis: " + analysisOnly);
        } catch (NamingException e) {
            log.info("EXCEPTION IN READING FROM CONFIG FILE FOR analysisOnly .. setting false by default");
            analysisOnly = false;
        }
    }

    public void setBucket(ArrayList<IndexQuery> bucket) {
        buckets.add(bucket);
    }

    public int getBucketSize() {
        return this.buckets.size();
    }

    private static final long MEGABYTE = 1024L * 1024L;

    public static long bytesToMegabytes(long bytes) {
        return bytes / MEGABYTE;
    }

    @Override
    public void run() {
        this.isRunning = true;
        while (!buckets.isEmpty()) {

            // Get the Java runtime
            Runtime runtime = Runtime.getRuntime();
            // Run the garbage collector
            //runtime.gc();
            // Calculate the used memory
            long memory = runtime.totalMemory() - runtime.freeMemory();
            log.info("Used memory is bytes: " + memory);
            log.info(System.currentTimeMillis() + " Used memory is megabytes: "+ bytesToMegabytes(memory));

            log.info(
                    "SaveRepository started at " + System.currentTimeMillis() + " with bucket size: " + buckets.size());

            if ( analysisOnly ) {
                log.info("Not saving ... but polling");
                buckets.poll();
            } else {
                template.bulkIndex(buckets.poll()); // blocking call
            }
            log.info("SaveRepository finished at " + System.currentTimeMillis());

            if ( buckets.size() == 0 && !analysisRunning ) {
                this.publishEndOfSave(System.currentTimeMillis());
            }

            if (buckets.size() <= lowWaterMark) {
                this.publishLow();
            }
        }
        isRunning = false;
    }

    /**
    *    Since AnalysisRepository is blocked when SaveRepository is running, this thread itself ensures that
    *    analysis will resume when low water-mark is reached.
    */
    private void publishLow() {
        log.info(System.currentTimeMillis());
        eventBus.post(new BucketLimitEvent("GO"));
    }

    @Subscribe
    public void end(EndAnalysisEvent event) {
        //log.info("Save repo received signal that analysis has ended");
        analysisRunning = false;
    }

    private void publishEndOfSave(long time) {
        //log.info("Publishing end of Save Repository");
        eventBus.post(new SaveRepoEndEvent(time));
    }
}