de.bund.bfr.knime.fsklab.nodes/src/de/bund/bfr/knime/fsklab/nodes/RScriptHandler.java
package de.bund.bfr.knime.fsklab.nodes;
import de.bund.bfr.knime.fsklab.nodes.plot.BasePlotter;
import de.bund.bfr.knime.fsklab.nodes.plot.Ggplot2Plotter;
import de.bund.bfr.knime.fsklab.preferences.PreferenceInitializer;
import de.bund.bfr.knime.fsklab.r.client.IRController.RException;
import de.bund.bfr.knime.fsklab.r.client.LibRegistry;
import de.bund.bfr.knime.fsklab.r.client.LibRegistry.NoInternetException;
import de.bund.bfr.knime.fsklab.r.client.RController;
import de.bund.bfr.knime.fsklab.r.client.RprofileManager;
import de.bund.bfr.knime.fsklab.r.client.ScriptExecutor;
import de.bund.bfr.knime.fsklab.v2_0.FskPortObject;
import de.bund.bfr.knime.fsklab.v2_0.FskSimulation;
import de.bund.bfr.knime.fsklab.v2_0.runner.RunnerNodeInternalSettings;
import de.bund.bfr.knime.fsklab.v2_0.runner.RunnerNodeSettings;
import de.bund.bfr.metadata.swagger.Parameter;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import metadata.SwaggerUtil;
import org.knime.core.node.BufferedDataTable;
import org.knime.core.node.CanceledExecutionException;
import org.knime.core.node.ExecutionContext;
import org.knime.core.node.NodeLogger;
import org.knime.core.util.FileUtil;
import org.rosuda.REngine.REXP;
import org.rosuda.REngine.REXPMismatchException;
public class RScriptHandler extends ScriptHandler {
ScriptExecutor executor;
RController controller;
public RScriptHandler() throws RException, IOException {
this(new ArrayList<String>(0));
}
public RScriptHandler(List<String> packages) throws RException, IOException {
RprofileManager.subscribe();
// initialize LibRegistry before Controller to avoid errors on switching R in preferences
LibRegistry.instance();
this.controller = new RController();
this.executor = new ScriptExecutor(controller);
if (packages.contains("ggplot2") || packages.contains("tidyverse")) {
plotter = new Ggplot2Plotter(controller);
} else {
plotter = new BasePlotter(controller);
}
}
@Override
public void setWorkingDirectory(Path workingDirectory, ExecutionContext exec) throws Exception {
controller.setWorkingDirectory(workingDirectory);
}
@Override
public String[] runScript(String script, ExecutionContext exec, Boolean showErrors)
throws RException, CanceledExecutionException, InterruptedException, REXPMismatchException {
if (showErrors) {
REXP c = executor.execute(script, exec);
String[] execResult = c.asStrings();
return execResult;
} else {
executor.executeIgnoreResult(script, exec);
}
return null;
}
@Override
public void convertToKnimeDataTable(FskPortObject fskObj, ExecutionContext exec)
throws Exception {
LibRegistry.instance().install(Arrays.asList("data.table"));
String DT = "library(data.table)\n";
// "install.packages(\"data.table\")\n"
// +"knime.out <-list(x,Value)\n"
// +"knime.out <- setDT(lapply(knime.out,\"length<-\",max(length(knime.out))))[]";
FskSimulation fskSimulation = fskObj.simulations.get(fskObj.selectedSimulationIndex);
String input_lst = "";
for (Map.Entry<String, String> entry : fskSimulation.getParameters().entrySet()) {
String parameterName = entry.getKey();
// String parameterValue = entry.getValue();
input_lst += parameterName + "= " + parameterName + ",";
}
String output_lst = "";
List<Parameter> paras = SwaggerUtil.getParameter(fskObj.modelMetadata);
for (Parameter p : paras) {
if (p.getClassification() == Parameter.ClassificationEnum.OUTPUT) {
output_lst += p.getName() + "= " + p.getName() + ",";
}
}
String lst = input_lst + output_lst;
lst = lst.substring(0, lst.lastIndexOf(","));
DT += "knime.out <- list(" + lst + ")\n";
DT += "knime.out <- setDT(lapply(knime.out,\"length<-\",max(length(knime.out))))[]";
// lst[lst.lastIndexOf(",")] = ")";
controller.eval(DT, false);
BufferedDataTable out = controller.importBufferedDataTable("knime.out", false, exec);
// ----------------CONVERT KNIME TABLE BACK TO R
// controller.exportFlowVariables(inFlowVariables, name, exec);
// controller.importDataFromPorts(inData, exec, batchSize, rType, sendRowNames);
controller.monitoredAssign("knime.in", out, exec, 100, "String", true);
// monitoredAssign("knime.in", out, exec.createSubProgress(0.5),batchSize, rType, true);
}
@Override
public void installLibs(final FskPortObject fskObj, ExecutionContext exec, NodeLogger LOGGER)
throws Exception {
// Install needed libraries
if (!fskObj.packages.isEmpty()) {
// surround with try catch,
// in case LibRegistry controller is somehow closed, refresh on fail
// (this happens, if a library is not successfully installed by FSK-Lab but then
// installed by user themselves (or Rserve instance is closed by other means)
try {
LibRegistry.instance().install(fskObj.packages);
} catch ( RException| REXPMismatchException | NoInternetException e) {
PreferenceInitializer.refresh = true;
LibRegistry.instance().install(fskObj.packages);
}
}
exec.setProgress(0.2, "Add paths to libraries");
controller.addPackagePath(LibRegistry.instance().getInstallationPath());
}
@Override
public String buildParameterScript(final FskSimulation simulation) {
return NodeUtils.buildParameterScript(simulation);
}
@Override
public void plotToImageFile(final RunnerNodeInternalSettings internalSettings,
RunnerNodeSettings nodeSettings, final FskPortObject fskObj, ExecutionContext exec)
throws Exception {
plotter.plotPng(internalSettings.imageFile, fskObj.getViz());
}
@Override
public void saveWorkspace(final FskPortObject fskObj, ExecutionContext exec) throws Exception {
if (fskObj.getWorkspace() == null) {
fskObj.setWorkspace(FileUtil.createTempFile("workspace", ".RData").toPath());
}
controller.saveWorkspace(fskObj.getWorkspace(), exec);
}
@Override
public void restoreDefaultLibrary() throws Exception {
controller.restorePackagePath();
}
@Override
public String getStdOut() {
return executor.getStdOut();
}
@Override
public String getStdErr() {
return executor.getStdErr();
}
@Override
public void cleanup(ExecutionContext exec) throws Exception {
executor.cleanup(exec);
}
@Override
public void setupOutputCapturing(ExecutionContext exec)
throws RException, CanceledExecutionException, InterruptedException {
executor.setupOutputCapturing(exec);
}
@Override
public void finishOutputCapturing(ExecutionContext exec)
throws RException, CanceledExecutionException, InterruptedException {
executor.finishOutputCapturing(exec);
}
@Override
public String getPackageVersionCommand(String pkg_name) {
String command = "packageDescription(\"" + pkg_name + "\")$Version";
return command;
}
@Override
public String getPackageVersionCommand(List<String> pkg_names) {
String command =
"available.packages(contriburl = contrib.url(c(\"https://cloud.r-project.org/\"), \"both\"))[c('"
+ pkg_names.stream().collect(Collectors.joining("','")) + "'),]";
return command;
}
@Override
public String getFileExtension() {
return "r";
}
@Override
public void close() throws Exception {
RprofileManager.unSubscribe();
controller.close();
}
@Override
protected String createVectorQuery(List<String> variableNames) {
return "c(" + String.join(", ", variableNames) + ")";
}
}