SiLeBAT/FSK-Lab

View on GitHub
de.bund.bfr.knime.fsklab.service/src/de/bund/bfr/knime/fsklab/service/FskService.java

Summary

Maintainability
D
2 days
Test Coverage
package de.bund.bfr.knime.fsklab.service;

import static spark.Spark.awaitInitialization;
import static spark.Spark.before;
import static spark.Spark.get;
import static spark.Spark.options;
import static spark.Spark.port;
import static spark.Spark.post;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Enumeration;
import java.util.Properties;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.eclipse.core.runtime.Platform;
import org.h2.tools.DeleteDbFiles;
import org.knime.core.node.NodeLogger;
import org.osgi.framework.Bundle;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.threetenbp.ThreeTenModule;

import de.bund.bfr.metadata.swagger.Model;
import de.bund.bfr.rakip.vocabularies.data.AccreditationProcedureRepository;
import de.bund.bfr.rakip.vocabularies.data.AvailabilityRepository;
import de.bund.bfr.rakip.vocabularies.data.BasicProcessRepository;
import de.bund.bfr.rakip.vocabularies.data.BasicRepository;
import de.bund.bfr.rakip.vocabularies.data.CollectionToolRepository;
import de.bund.bfr.rakip.vocabularies.data.CountryRepository;
import de.bund.bfr.rakip.vocabularies.data.FishAreaRepository;
import de.bund.bfr.rakip.vocabularies.data.FormatRepository;
import de.bund.bfr.rakip.vocabularies.data.HazardRepository;
import de.bund.bfr.rakip.vocabularies.data.HazardTypeRepository;
import de.bund.bfr.rakip.vocabularies.data.IndSumRepository;
import de.bund.bfr.rakip.vocabularies.data.LaboratoryAccreditationRepository;
import de.bund.bfr.rakip.vocabularies.data.LanguageRepository;
import de.bund.bfr.rakip.vocabularies.data.LanguageWrittenInRepository;
import de.bund.bfr.rakip.vocabularies.data.ModelClassRepository;
import de.bund.bfr.rakip.vocabularies.data.ModelEquationClassRepository;
import de.bund.bfr.rakip.vocabularies.data.ModelSubclassRepository;
import de.bund.bfr.rakip.vocabularies.data.PackagingRepository;
import de.bund.bfr.rakip.vocabularies.data.ParameterClassificationRepository;
import de.bund.bfr.rakip.vocabularies.data.ParameterDatatypeRepository;
import de.bund.bfr.rakip.vocabularies.data.ParameterDistributionRepository;
import de.bund.bfr.rakip.vocabularies.data.ParameterSourceRepository;
import de.bund.bfr.rakip.vocabularies.data.ParameterSubjectRepository;
import de.bund.bfr.rakip.vocabularies.data.PopulationRepository;
import de.bund.bfr.rakip.vocabularies.data.ProductMatrixRepository;
import de.bund.bfr.rakip.vocabularies.data.ProductTreatmentRepository;
import de.bund.bfr.rakip.vocabularies.data.ProductionMethodRepository;
import de.bund.bfr.rakip.vocabularies.data.PublicationStatusRepository;
import de.bund.bfr.rakip.vocabularies.data.PublicationTypeRepository;
import de.bund.bfr.rakip.vocabularies.data.RegionRepository;
import de.bund.bfr.rakip.vocabularies.data.RightRepository;
import de.bund.bfr.rakip.vocabularies.data.SamplingMethodRepository;
import de.bund.bfr.rakip.vocabularies.data.SamplingPointRepository;
import de.bund.bfr.rakip.vocabularies.data.SamplingProgramRepository;
import de.bund.bfr.rakip.vocabularies.data.SamplingStrategyRepository;
import de.bund.bfr.rakip.vocabularies.data.SoftwareRepository;
import de.bund.bfr.rakip.vocabularies.data.SourceRepository;
import de.bund.bfr.rakip.vocabularies.data.StatusRepository;
import de.bund.bfr.rakip.vocabularies.data.TechnologyTypeRepository;
import de.bund.bfr.rakip.vocabularies.data.UnitCategoryRepository;
import de.bund.bfr.rakip.vocabularies.data.UnitRepository;
import metadata.ConversionUtils;
import metadata.ConversionUtils.ModelClass;
import spark.ResponseTransformer;

public class FskService implements Runnable {

    private static final String MIME_JSON = "application/json";

    private static final NodeLogger LOGGER = NodeLogger.getLogger(FskService.class);

    private static final JsonTransformer jsonTransformer = new JsonTransformer();

    private final ConversionUtils utils = new ConversionUtils();

    private int port;
    
    private static FskService service = new FskService();

    private FskService() {
    }

    public static FskService instance() {
        return service;
    }

    public int getPort() {
        return port;
    }

    @Override
    public void run() {

        try {
            initDatabase();
        } catch (ClassNotFoundException err) {
            LOGGER.error("Initializing DB", err);
            return;
        } catch (SQLException | IOException err) {
            // These two exceptions are already logged in initDatabase
            return;
        }

        port(0);

        // Enable CORS
        options("/*", (request, response) -> {

            String accessControlRequestHeaders = request.headers("Access-Control-Request-Headers");
            if (accessControlRequestHeaders != null) {
                response.header("Access-Control-Allow-Headers", accessControlRequestHeaders);
            }

            String accessControlRequestMethod = request.headers("Access-Control-Request-Method");
            if (accessControlRequestMethod != null) {
                response.header("Access-Control-Allow-Methods", accessControlRequestMethod);
            }

            return "OK";
        });

        before((request, response) -> response.header("Access-Control-Allow-Origin", "*"));

        get("getById/:vocabulary/:id", (req, res) -> {
            try (Connection connection = DriverManager.getConnection("jdbc:h2:~/.fsk/vocabularies")) {
                res.type(MIME_JSON);
                BasicRepository<?> repository = getRepository(req.params(":vocabulary"), connection);
                int id = Integer.parseInt(req.params(":id"));
                return repository.getById(id);
            }
        }, jsonTransformer);

        get("/getAll/:vocabulary", (req, res) -> {
            try (Connection connection = DriverManager.getConnection("jdbc:h2:~/.fsk/vocabularies")) {
                res.type(MIME_JSON);
                BasicRepository<?> repository = getRepository(req.params(":vocabulary"), connection);
                return repository.getAll();
            }
        }, jsonTransformer);

        get("/getAllNames/:vocabulary", (req, res) -> {
            try (Connection connection = DriverManager.getConnection("jdbc:h2:~/.fsk/vocabularies")) {
                res.type(MIME_JSON);
                BasicRepository<?> repository = getRepository(req.params(":vocabulary"), connection);
                return repository.getAllNames();
            }
        }, jsonTransformer);

        // input metadata as body parameter
        post("convertMetadata/:targetModelClass", (req, res) -> {
            res.type("application/json");

            try {
                JsonNode inputMetadata = jsonTransformer.MAPPER.readTree(req.body());
                ModelClass targetClass = ModelClass.valueOf(req.params("targetModelClass"));
                Model convertedMetadata = utils.convertModel(inputMetadata, targetClass);
                res.status(200);
                return convertedMetadata;
            } catch (Exception err) {
                res.status(400);
                return err;
            }
        }, jsonTransformer);

        post("joinMetadata", (req, res) -> {
            // The body keeps two JSON models in an array.
            res.type("application/json");

            try {
                JsonNode models = jsonTransformer.MAPPER.readTree(req.body());
                JsonNode firstModel = models.get(0);
                JsonNode secondModel = models.get(1);
                Model joinedModel = utils.joinModels(firstModel, secondModel, ModelClass.genericModel);
                res.status(200);
                return joinedModel;
            } catch (Exception err) {
                res.status(500);
                return err;
            }
        }, jsonTransformer);

        // After initializing the service, get the randomly picked port by Spark.
        awaitInitialization();
        port = port();
    }

    private static class JsonTransformer implements ResponseTransformer {

        ObjectMapper MAPPER = new ObjectMapper().registerModule(new ThreeTenModule())
                .setSerializationInclusion(JsonInclude.Include.NON_NULL);

        @Override
        public String render(Object model) {
            try {
                return MAPPER.writeValueAsString(model);
            } catch (JsonProcessingException err) {
                return "";
            }
        }
    }

    private BasicRepository<?> getRepository(String vocabulary, Connection connection) {
        switch (vocabulary) {
        case "accreditation_procedure":
            return new AccreditationProcedureRepository(connection);
        case "availability":
            return new AvailabilityRepository(connection);
        case "basic_process":
            return new BasicProcessRepository(connection);
        case "collection_tool":
            return new CollectionToolRepository(connection);
        case "country":
            return new CountryRepository(connection);
        case "fish_area":
            return new FishAreaRepository(connection);
        case "format":
            return new FormatRepository(connection);
        case "hazard":
            return new HazardRepository(connection);
        case "hazard_type":
            return new HazardTypeRepository(connection);
        case "ind_sum":
            return new IndSumRepository(connection);
        case "laboratory_accreditation":
            return new LaboratoryAccreditationRepository(connection);
        case "language":
            return new LanguageRepository(connection);
        case "language_written_in":
            return new LanguageWrittenInRepository(connection);
        case "model_class":
            return new ModelClassRepository(connection);
        case "model_equation_class":
            return new ModelEquationClassRepository(connection);
        case "model_subclass":
            return new ModelSubclassRepository(connection);
        case "packaging":
            return new PackagingRepository(connection);
        case "parameter_classification":
            return new ParameterClassificationRepository(connection);
        case "parameter_datatype":
            return new ParameterDatatypeRepository(connection);
        case "parameter_distribution":
            return new ParameterDistributionRepository(connection);
        case "parameter_source":
            return new ParameterSourceRepository(connection);
        case "parameter_subject":
            return new ParameterSubjectRepository(connection);
        case "population":
            return new PopulationRepository(connection);
        case "product_matrix":
            return new ProductMatrixRepository(connection);
        case "product_treatment":
            return new ProductTreatmentRepository(connection);
        case "production_method":
            return new ProductionMethodRepository(connection);
        case "publication_status":
            return new PublicationStatusRepository(connection);
        case "publication_type":
            return new PublicationTypeRepository(connection);
        case "region":
            return new RegionRepository(connection);
        case "right":
            return new RightRepository(connection);
        case "sampling_method":
            return new SamplingMethodRepository(connection);
        case "sampling_point":
            return new SamplingPointRepository(connection);
        case "sampling_program":
            return new SamplingProgramRepository(connection);
        case "sampling_strategy":
            return new SamplingStrategyRepository(connection);
        case "software":
            return new SoftwareRepository(connection);
        case "source":
            return new SourceRepository(connection);
        case "status":
            return new StatusRepository(connection);
        case "unit":
            return new UnitRepository(connection);
        case "unit_category":
            return new UnitCategoryRepository(connection);
        case "technology_type":
            return new TechnologyTypeRepository(connection);
        default:
            break;
        }

        return null;
    }

    /**
     * Import database from SQL files in bundle to local DB in ~/.fsk/vocabularies.
     * <p>
     * Import stops if the database could not be created. Individual tables can fail
     * and the import will continue with the rest of tables.
     * 
     * @throws SQLException If connection to local DB could not be opened.
     * @throws ClassNotFoundException If H2 DB driver is missing.
     * @throws IOException If the database or its structure could not be created.
     */
    private void initDatabase() throws SQLException, ClassNotFoundException, IOException {

        final Properties fastImportProperties = new Properties();
        fastImportProperties.put("LOG", 0);
        fastImportProperties.put("CACHE_SIZE", 65536);
        fastImportProperties.put("LOCK_MODE", 0);
        fastImportProperties.put("UNDO_LOG", 0);

        Class.forName("org.h2.Driver");
        DeleteDbFiles.execute("~/.fsk", "vocabularies", true); // Delete DB if it exists
        try (Connection connection = DriverManager.getConnection("jdbc:h2:~/.fsk/vocabularies",
                fastImportProperties)) {
            
            // Load tables
            Bundle bundle = Platform.getBundle("de.bund.bfr.knime.fsklab.service");
            
            File temporaryFile = File.createTempFile("vocabularies", ".jar");
            URL bundleUrl = bundle.getEntry("/vocabularies-2.0.2.jar");
            
            try {
                // Extract vocabularies jar from bundle to a temporary file
                try (InputStream inputStream = bundleUrl.openStream()) {
                    FileUtils.copyToFile(inputStream, temporaryFile);
                }
                
                try (JarFile jarFile = new JarFile(temporaryFile)) {
                                        
                    // Read tables.sql
                    String tableSqlScript;
                    JarEntry tablesEntry = jarFile.getJarEntry("data/tables.sql");
                    try (InputStream inputStream = jarFile.getInputStream(tablesEntry)) {
                        tableSqlScript = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
                    } catch (IOException err) {
                        LOGGER.error("RAKIP vocabularies could not be imported", err);
                        throw err;
                    }
                    
                    // Import tables.sql
                    try (Statement statement = connection.createStatement()) {
                        statement.execute(tableSqlScript);
                    } catch (SQLException err) {
                        LOGGER.error("RAKIP vocabularies could not be imported", err);
                        throw err;
                    }
                    
                    // Get and import SQL data files
                    Enumeration<JarEntry> entries = jarFile.entries();
                    while (entries.hasMoreElements()) {
                        JarEntry nextEntry = entries.nextElement();
                        
                        if (nextEntry.getName().startsWith("data/initialdata/") && nextEntry.getName().endsWith(".sql")) {
                            try (InputStream is = jarFile.getInputStream(nextEntry);
                                    LineIterator lineIterator = IOUtils.lineIterator(is, StandardCharsets.UTF_8)) {
                                while (lineIterator.hasNext()) {
                                    try (Statement statement = connection.createStatement()) {
                                        statement.execute(lineIterator.nextLine());
                                    }
                                }
                            } catch (IOException | SQLException err) {
                                // Log error and continue with other vocabularies
                                LOGGER.warn("SQL file could not be imported: " + nextEntry.getName(), err);
                            }
                        }
                    }
                }
                
            } finally {
                if (temporaryFile.exists()) {
                    temporaryFile.delete();
                }
            }
        }
    }
}