SiLeBAT/FSK-Lab

View on GitHub
de.bund.bfr.knime.fsklab.r/src/de/bund/bfr/knime/fsklab/r/client/RController.java

Summary

Maintainability
F
1 wk
Test Coverage
/*
 * ------------------------------------------------------------------ Copyright by KNIME GmbH,
 * Konstanz, Germany Website: http://www.knime.org; Email: contact@knime.org
 *
 * This program is free software; you can redistribute it and/or modify it under the terms of the
 * GNU General Public License, Version 3, as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along with this program; if
 * not, see <http://www.gnu.org/licenses>.
 *
 * Additional permission under GNU GPL version 3 section 7:
 *
 * KNIME interoperates with ECLIPSE solely via ECLIPSE's plug-in APIs. Hence, KNIME and ECLIPSE are
 * both independent programs and are not derived from each other. Should, however, the
 * interpretation of the GNU GPL Version 3 ("License") under any applicable laws result in KNIME and
 * ECLIPSE being a combined program, KNIME GMBH herewith grants you the additional permission to use
 * and propagate KNIME together with ECLIPSE with only the license terms in place for ECLIPSE
 * applying to ECLIPSE and the GNU GPL Version 3 applying for KNIME, provided the license terms of
 * ECLIPSE themselves allow for the respective use and propagation of ECLIPSE together with KNIME.
 *
 * Additional permission relating to nodes for KNIME that extend the Node Extension (and in
 * particular that are based on subclasses of NodeModel, NodeDialog, and NodeView) and that only
 * interoperate with KNIME through standard APIs ("Nodes"): Nodes are deemed to be separate and
 * independent programs and to not be covered works. Notwithstanding anything to the contrary in the
 * License, the License does not apply to Nodes, you are not required to license Nodes under the
 * License, and you are granted a license to prepare and propagate Nodes, in each case even if such
 * Nodes are propagated with or for interoperation with KNIME. The owner of a Node may freely choose
 * the license terms applicable to such Node, including when such Node is propagated with or for
 * interoperation with KNIME. ---------------------------------------------------------------------
 *
 * History 17.09.2007 (thiel): created
 */
package de.bund.bfr.knime.fsklab.r.client;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.knime.core.data.BooleanValue;
import org.knime.core.data.DataCell;
import org.knime.core.data.DataColumnSpec;
import org.knime.core.data.DataColumnSpecCreator;
import org.knime.core.data.DataRow;
import org.knime.core.data.DataTableSpec;
import org.knime.core.data.DataType;
import org.knime.core.data.DataValue;
import org.knime.core.data.DoubleValue;
import org.knime.core.data.IntValue;
import org.knime.core.data.MissingCell;
import org.knime.core.data.RowKey;
import org.knime.core.data.StringValue;
import org.knime.core.data.collection.CollectionCellFactory;
import org.knime.core.data.collection.CollectionDataValue;
import org.knime.core.data.collection.ListCell;
import org.knime.core.data.def.BooleanCell;
import org.knime.core.data.def.DefaultRow;
import org.knime.core.data.def.DoubleCell;
import org.knime.core.data.def.IntCell;
import org.knime.core.data.def.StringCell;
import org.knime.core.node.BufferedDataContainer;
import org.knime.core.node.BufferedDataTable;
import org.knime.core.node.BufferedDataTable.KnowsRowCountTable;
import org.knime.core.node.CanceledExecutionException;
import org.knime.core.node.ExecutionContext;
import org.knime.core.node.ExecutionMonitor;
import org.knime.core.node.NodeLogger;
import org.knime.core.node.port.PortObject;
import org.knime.core.node.util.CheckUtils;
import org.knime.core.node.workflow.FlowVariable;
import org.knime.core.util.ThreadPool;
import org.knime.core.util.ThreadUtils;
import org.knime.ext.r.node.local.port.RPortObject;
import org.rosuda.REngine.REXP;
import org.rosuda.REngine.REXPDouble;
import org.rosuda.REngine.REXPFactor;
import org.rosuda.REngine.REXPGenericVector;
import org.rosuda.REngine.REXPInteger;
import org.rosuda.REngine.REXPList;
import org.rosuda.REngine.REXPLogical;
import org.rosuda.REngine.REXPMismatchException;
import org.rosuda.REngine.REXPString;
import org.rosuda.REngine.REXPVector;
import org.rosuda.REngine.REngineException;
import org.rosuda.REngine.RFactor;
import org.rosuda.REngine.RList;
import org.rosuda.REngine.Rserve.RConnection;
import org.rosuda.REngine.Rserve.RserveException;
import com.sun.jna.Platform;

import de.bund.bfr.knime.fsklab.preferences.RBinUtil;
import de.bund.bfr.knime.fsklab.preferences.RBinUtil.InvalidRHomeException;
import de.bund.bfr.knime.fsklab.r.server.RConnectionFactory;
import de.bund.bfr.knime.fsklab.r.server.RConnectionFactory.RConnectionResource;
import de.bund.bfr.knime.fsklab.preferences.PreferenceInitializer;

/**
 * RController.
 * 
 * This class manages some way of communicating with R, executing R code and moving data back and
 * forth.
 * 
 * Currently, this class is a singleton and enforces mutual exclusion.
 * 
 * @author Heiko Hofer
 * @author Jonathan Hale
 */
public class RController implements IRController {

  private final NodeLogger LOGGER = NodeLogger.getLogger(getClass());

  private static final String TEMP_VARIABLE_NAME = "knimertemp836481";

  /** Name of the R variable which stores the names of loaded R libraries */
  public static final String R_LOADED_LIBRARIES_VARIABLE = "knime.loaded.libraries";

  private RConnectionResource m_connection;

  private Properties m_rProps;

  private boolean m_initialized = false;
  private boolean m_useNodeContext = false;

  private static boolean quartzFound;

  private static boolean cairoFound;
  

  /**
   * Constructor. Calls {@link #initialize()}. To avoid initialization, use
   * {@link #RController(boolean)}.
   * 
   * @throws RException
   */
  public RController() throws RException {
    this(false);
    initialize();
  }

  /**
   * Constructor
   * 
   * @param useNodeContext Whether to use the NodeContext for threads
   */
  public RController(final boolean useNodeContext) {
    setUseNodeContext(useNodeContext);
  }

  // --- NodeContext handling ---
  @Override
  public void setUseNodeContext(boolean useNodeContext) {
    m_useNodeContext = useNodeContext;
  }

  @Override
  public boolean isUsingNodeContext() {
    return m_useNodeContext;
  }

  // --- Initialization & RConnection lifecycle ---
  @Override
  public void initialize() throws RException {
    initR();
  }
  
  /**
   * Check if the RController is initialized and throws {@link RControllerNotInitializedException}
   * if not.
   */
  private void checkInitialized() {
    if (!m_initialized || m_connection == null) {
      throw new RControllerNotInitializedException();
    }
    if (!m_connection.isRInstanceAlive()) {
      throw new RuntimeException("RServe process terminated unexpectedly");
    }
    if (m_connection.isAvailable()) {
      // resource should never be available, if held by this RController
      // Available means available to acquire for other RControllers.
      throw new RuntimeException("Invalid resource state: lost ownership of connection resource.");
    }
  }

  @Override
  public void close() throws RException {
    if (m_connection != null) {
      m_connection.release();
      m_connection = null;
    }

    m_initialized = false;
  }

  /**
   * Terminate and relaunch the R process this controller is connected to. This is currently the
   * only way to interrupt command execution.
   */
  private synchronized void terminateAndRelaunch() {
    LOGGER.debug("Terminate R process");

    terminateRProcess();

    try {
      m_connection = initRConnection();
      m_initialized = m_connection != null && m_connection.get().isConnected();
      LOGGER.debug("Recovered with a new R process");
    } catch (Exception e) {
      throw new RuntimeException("Initializing R with Rserve failed", e);
    }
  }

  /**
   * Terminate the R process started for this RController.
   */
  private synchronized void terminateRProcess() {
    if (m_connection != null) {
      m_connection.destroy(true);
      m_connection = null;
    }

    m_initialized = false;
  }

  /**
   * Check if the connection is still valid and recover if not.
   */
  private synchronized void checkConnectionAndRecover() {
    if (m_connection != null && m_connection.get().isConnected()
        && m_connection.isRInstanceAlive()) {
      // connection is fine
      return;
    }

    // all of the session data has been lost. We cannot recover from that.
    terminateAndRelaunch();
  }

  /**
   * Create and initialize a R connection
   *
   * @return the new RConnection
   */
  private static RConnectionResource initRConnection() throws RserveException, IOException {
    final RConnectionResource resource = RConnectionFactory.createConnection();

    if (!resource.get().isConnected()) {
      throw new IOException("Could not initialize RController: Resource was not connected");
    }

    return resource;
  }

  /**
   * Initialize the underlying REngine with a backend.
   * 
   * @throws IOException
   */
  private void initR() throws RException {
    try {
      String rHome = PreferenceInitializer.getR3Provider().getRHome();

      // FIXME: Workaround for Linux server in BfR. If R home is not configure then
      // defaults to /usr/lib/R/
      if (StringUtils.isEmpty(rHome) && Platform.isLinux()) {
        rHome = "/usr/lib/R/";
      }

      RBinUtil.checkRHome(rHome);

      m_rProps = RBinUtil.retrieveRProperties();

      if (!m_rProps.containsKey("major")) {
        throw new RException("Cannot determine major version of R. "
            + "Please check the R installation defined in the KNIME preferences.", null);
      }

      // Check rserveProp is not null or empty
      final String rserveProp = m_rProps.getProperty("Rserve.path");
      if (StringUtils.isEmpty(rserveProp)) {
        throw new RException("Missing required package: Rserve", null);
      }

      // Check miniCranProp is not null or empty
      final String miniCranProp = m_rProps.getProperty("miniCRAN.path");
      if (StringUtils.isEmpty(miniCranProp)) {
        throw new RException("Missing required package: miniCRAN", null);
      }

      m_connection = initRConnection();

    } catch (final InvalidRHomeException ex) {
      throw new RException("R Home is invalid", ex);
    } catch (final RserveException | IOException e) {
      throw new RException("Exception occured during R initialization.", e);
    }

    m_initialized = (m_connection != null && m_connection.get().isConnected());

    if (Platform.isWindows()) {
      try {
        final String rMemoryLimit = m_rProps.get("memory.limit").toString().trim();
        // set memory to the one of the used R
        eval("memory.limit(" + rMemoryLimit + ");", false);
      } catch (Exception e) {
        LOGGER.error("R initialisation failed. " + e.getMessage());
        throw new RuntimeException(e);
      }
    } else if (Platform.isMac()) {
      checkCairoOnMac();
    }
  }

  private void checkCairoOnMac() throws RException {

    if (cairoFound && quartzFound) {
      return;
    }

    // produce a warning message if 'Cairo' package is not installed
    try {
      final REXP ret = eval("find.package('Cairo')", true);
      final String cairoPath = ret.asString();

      if (StringUtils.isNotEmpty(cairoPath)) {
        // under Mac we need Cairo package to use png()/bmp() etc devices.
        cairoFound = true;
      }
    } catch (final RException | REXPMismatchException exception) {
      LOGGER.debug("Error while querying Cairo package version: " + exception.getMessage(),
          exception);
    }

    if (!cairoFound) {
      LOGGER.warn("The package 'Cairo' needs to be installed in your R installation for bitmap "
          + "graphics devices to work properly. Please install it in R using "
          + "\"install.packages('Cairo')\".");
      return;
    }

    // Cairo requires XQuartz to be installed. We make sure it is, since
    // loading the Cairo library will crash Rserve otherwise.
    final ProcessBuilder builder = new ProcessBuilder("mdls", "-name", "kMDItemVersion",
        "/Applications/Utilities/XQuartz.app");

    try {
      final Process process = builder.start();

      // check if output of process was a valid version
      final BufferedReader stdout =
          new BufferedReader(new InputStreamReader(process.getInputStream()));
      String line;
      while ((line = stdout.readLine()) != null) {
        if (line.matches("kMDItemVersion = \"2(?:\\.[0-9]+)+\"")) {
          quartzFound = true;
        }
      }

      try {
        process.waitFor();
      } catch (final InterruptedException exception) {
        // happens when user cancels node at this point for example
        LOGGER.debug("Interrupted while waiting for mdls process to terminate.", exception);
      }
    } catch (final IOException exception) {
      // should never happen, just in case, here is something for
      // users to report if they accidentally deleted their mdls
      LOGGER.error("Could not run mdls to check for XQuartz version: " + exception.getMessage(),
          exception);
    }

    if (!quartzFound) {
      throw new RException("XQuartz is required for the Cairo library on MacOS. Please download "
          + "and install XQuartz from http://www.xquartz.org/.", null);
    }
  }

  // --- Simple Getters ---
  @Override
  public RConnection getREngine() {
    checkInitialized();

    return m_connection.get();
  }

  @Override
  public boolean isInitialized() {
    return m_initialized;
  }

  // --- R evaluation ---
  @Override
  public REXP eval(final String expr, final boolean resolve) throws RException {

    try {
      synchronized (getREngine()) {
        // sadly, eval(String, RExpr, boolean) has a bug and just completely ignores the
        // "resolve" parameter. Imitating its behaviour here.
        if (resolve) {
          REXP x = getREngine().eval(expr);
          return x;
        } else {
          getREngine().voidEval(expr);
          return null;
        }
      }
    } catch (REngineException e) {
      throw new RException(RException.MSG_EVAL_FAILED + ": \"" + expr + "\"", e);
    }
  }

  @Override
  public REXP monitoredEval(final String expr, final ExecutionMonitor exec, final boolean resolve)
      throws RException, CanceledExecutionException, InterruptedException {
    checkInitialized();
    try {
      return new MonitoredEval(exec).run(expr, resolve);
    } catch (RException | REngineException | REXPMismatchException e) {
      throw new RException(RException.MSG_EVAL_FAILED + ": \"" + expr + "\"", e);
    }
  }

  @Override
  public void assign(final String symbol, final int value) throws RException {
    checkInitialized();
    try {
      synchronized (getREngine()) {
        getREngine().assign(symbol, new int[] {value});
      }
    } catch (REngineException exception) {
      throw new RException(RException.MSG_EVAL_FAILED + ": \"" + symbol + "\"", exception);
    }
  }

  @Override
  public void assign(final String symbol, final double value) throws RException {
    checkInitialized();
    try {
      synchronized (getREngine()) {
        getREngine().assign(symbol, new double[] {value});
      }
    } catch (REngineException exception) {
      throw new RException(RException.MSG_EVAL_FAILED + ": \"" + symbol + "\"", exception);
    }
  }

  @Override
  public void assign(final String expr, final String value) throws RException {
    checkInitialized();
    try {
      synchronized (getREngine()) {
        getREngine().assign(expr, value);
      }
    } catch (REngineException exception) {
      throw new RException(RException.MSG_EVAL_FAILED + ": \"" + expr + "\"", exception);
    }
  }

  @Override
  public void assign(final String expr, final REXP value) throws RException {
    checkInitialized();
    try {
      synchronized (getREngine()) {
        getREngine().assign(expr, value);
      }
    } catch (REngineException exception) {
      throw new RException(RException.MSG_EVAL_FAILED + ": \"" + expr + "\"", exception);
    }
  }

  @Override
  public void monitoredAssign(final String symbol, final REXP value, final ExecutionMonitor exec)
      throws RException, CanceledExecutionException {
    checkInitialized();
    try {
      new MonitoredEval(exec).assign(symbol, value);
    } catch (Exception exception) {
      throw new RException(String.format("Assigning value to %s failed.", symbol), exception);
    }
  }

  @Override
  public void exportFlowVariables(final Collection<FlowVariable> inFlowVariables, final String name,
      final ExecutionMonitor exec) throws RException, CanceledExecutionException {
    final REXP[] content = new REXP[inFlowVariables.size()];
    final String[] names = new String[inFlowVariables.size()];

    int i = 0;
    for (final FlowVariable flowVar : inFlowVariables) {
      exec.checkCanceled();
      names[i] = flowVar.getName();

      switch (flowVar.getType()) {
        case INTEGER:
          content[i] = new REXPInteger(flowVar.getIntValue());
          break;
        case DOUBLE:
          content[i] = new REXPDouble(flowVar.getDoubleValue());
          break;
        case STRING:
          content[i] = new REXPString(flowVar.getStringValue());
          break;
        case CREDENTIALS:
          break; // does nothing
      }
      i++;
    }

    monitoredAssign(TEMP_VARIABLE_NAME, new REXPList(new RList(content, names)), exec);
    eval(name + "<-" + TEMP_VARIABLE_NAME + ";rm(" + TEMP_VARIABLE_NAME + ")", false);
  }

  // Taken from KNIME RController. Should be updated to support FskPortObjects
  @Override
  public void importDataFromPorts(final PortObject[] inData, final ExecutionMonitor exec,
      final int batchSize, final String rType, final boolean sendRowNames)
      throws RException, CanceledExecutionException {
    // load workspaces from the input ports into the current R session
    for (final PortObject port : inData) {
      if (port instanceof RPortObject) {
        exec.setMessage("Loading workspace from R input port");
        final RPortObject rPortObject = (RPortObject) port;
        final File portFile = rPortObject.getFile();

        String unixPath = FilenameUtils.separatorsToUnix(portFile.getAbsolutePath());
        eval(
            "load(\"" + unixPath + "\")\n"
                + RController.createLoadLibraryFunctionCall(rPortObject.getLibraries(), false),
            false);
      } else if (port instanceof BufferedDataTable) {
        exec.setMessage("Exporting data to R");
        // write all input data to the R session
        monitoredAssign("knime.in", (BufferedDataTable) port, exec.createSubProgress(0.5),
            batchSize, rType, sendRowNames);
      }
    }

    exec.setProgress(1.0);
  }
 
  public void loadWorkspace(File workspaceFile) throws RException, CanceledExecutionException {
    // load workspaces from the file into the current R session
    String unixPath = FilenameUtils.separatorsToUnix(workspaceFile.getAbsolutePath());
    eval("load(\"" + unixPath + "\")\n", false);
  }

  @Override
  public Collection<FlowVariable> importFlowVariables(String variableName) throws RException {

    checkInitialized();
    final List<FlowVariable> flowVars = new ArrayList<>();
    try {
      // this used to be:
      // getREngine().get(variableName, null, true);
      // but that caused crashes on Mac (see AP-5646) - Jonathan hasn't gotten to the
      // bottom of it but apparently it's the above commented line that kills RServe.
      // Be more paranoid here and check existence first
      final REXP exists = getREngine().eval("exists(\"" + variableName + "\")");

      if (exists.asBytes()[0] == REXPLogical.FALSE) {
        return Collections.emptyList();
      }
      final REXP value = getREngine().eval("try(" + variableName + ")");

      if (value == null) {
        // A variable with this name does not exist
        return Collections.emptyList();
      }
      final RList rList = value.asList();

      for (int c = 0; c < rList.size(); c++) {
        final REXP rexp = rList.at(c);
        if (rexp.isInteger()) {
          flowVars.add(new FlowVariable((String) rList.names.get(c), rexp.asInteger()));
        } else if (rexp.isNumeric()) {
          flowVars.add(new FlowVariable((String) rList.names.get(c), rexp.asDouble()));
        } else if (rexp.isString()) {
          flowVars.add(new FlowVariable((String) rList.names.get(c), rexp.asString()));
        }
      }
    } catch (final REXPMismatchException exception) {
      throw new RException("Error importing flow variables from \"" + variableName + "\"",
          exception);
    } catch (final REngineException exception) {
      // the variable name was not found
    }

    return flowVars;
  }

  /**
   * Create an REXPLogical for a BooleanValue or {@link REXPLogical#isNA()}.
   */
  private static byte exportBooleanValue(final DataCell cell) {
    if (cell.isMissing()) {
      return REXPLogical.NA;
    }
    return ((BooleanValue) cell).getBooleanValue() ? REXPLogical.TRUE : REXPLogical.FALSE;
  }

  /**
   * Create an int for a IntValue or create a {@link REXPInteger#NA}.
   */
  private static int exportIntValue(final DataCell cell) {
    if (cell.isMissing()) {
      return REXPInteger.NA;
    }
    return ((IntValue) cell).getIntValue();
  }

  /**
   * Create a double for a DoubleValue or {@link REXPDouble#NA}.
   */
  private static double exportDoubleValue(final DataCell cell) {
    if (cell.isMissing()) {
      return REXPDouble.NA;
    }
    return ((DoubleValue) cell).getDoubleValue();
  }

  /**
   * Create a String for a StringValue or null.
   */
  private static String exportStringValue(final DataCell cell) {
    if (!cell.isMissing()) {
      return cell.toString();
    } else {
      return null;
    }
  }

  @Override
  public BufferedDataTable importBufferedDataTable(String varName, boolean nonNumbersAsMissing,
      ExecutionContext exec) throws RException, CanceledExecutionException {

    final REXP typeRexp = eval("class(" + varName + ")", true);
    if (typeRexp.isNull()) {
      // a variable with this name does not exist
      final BufferedDataContainer cont = exec.createDataContainer(new DataTableSpec());
      cont.close();
      return cont.getTable();
    }

    boolean isDataTable = false;
    try {
      final String type = typeRexp.asString();
      if (type.equals("data.table")) {
        isDataTable = true;
        LOGGER.debug("Using experimental support for receiving data as \"data.table\".");
      } else if (type.equals("matrix") || type.equals("list")) {
        eval(varName + "<-data.frame(" + varName + ")", false);
      } else if (!type.equals("data.frame")) {
        throw new RException(
            "CODING PROBLEM\timportBufferedDataTable(): Supporting only 'data.frame', "
                + "'data.table', 'matrix' and 'list' for type of \"" + varName + "\" (was '" + type
                + "').",
            null);
      }
    } catch (REXPMismatchException e) {
      throw new RException("Type of " + varName + " could not be parsed as string.", e);
    }

    final ThreadPool threadPool = ThreadPool.currentPool();
    try {
      // Get column names
      final String[] columnNames = eval("colnames(" + varName + ")", true).asStrings();
      final int numColumns = columnNames.length;

      // Get row count (and row names if not automatic compact 1:n storage)
      final REXP numRowsRexp = getREngine().eval(".row_names_info(" + varName + ")");
      final boolean compactRowNames = numRowsRexp.asInteger() < 0;
      if (!compactRowNames) {
        eval("knime.out.row.names<-attr(" + varName + ",\"row.names\")", false);
      }
      final int numRows = Math.abs(numRowsRexp.asInteger());
      int transferredRows = 0;

      final List<DataColumnSpec> colSpecs = new ArrayList<DataColumnSpec>();
      DataTableSpec outSpec = null;
      BufferedDataContainer cont = null;

      final DataCell[][] columns = new DataCell[numColumns][];
      @SuppressWarnings("unchecked")
      final Future<Void>[] futures = new Future[numColumns];
      Future<Void> addRowsFuture = null;

      while (transferredRows < numRows) {
        // this is NOT the chunk size value as per config dialog as receiving data happens
        // in chunks _and_ on columns
        int rowsThisBatch = Math.min(50000, numRows - transferredRows);

        if (numRows - transferredRows - rowsThisBatch < 10000) {
          // avoid final chunk being smaller than 10k rows
          rowsThisBatch = numRows - transferredRows;
        }

        // Expression for range of rows to transfer e.g.: 1:10000
        final String rowRangeExpr = (transferredRows + 1) + ":" + (transferredRows + rowsThisBatch);

        for (int i = 0; i < numColumns; ++i) {
          exec.checkCanceled();
          exec.setProgress((transferredRows / (double) numRows)
              + (rowsThisBatch / (double) numRows) * (i / (double) numColumns));

          final int rIndex = i + 1; // R starts indices at 1
          final String expr = varName + ((isDataTable) ? "[[" + rIndex + "]][" + rowRangeExpr + "]"
              : "[" + rowRangeExpr + "," + rIndex + "]");
          final REXP column = getREngine().eval(expr);

          if (outSpec == null) {
            // Create column spec for this column
            DataType colType;
            if (column.isNull()) { // column is missing (in R), edge case
              colType = StringCell.TYPE;
            } else if (column.isList()) {
              colType = DataType.getType(ListCell.class, DataType.getType(DataCell.class));
            } else {
              colType = importDataType(column);
            }
            colSpecs.add(new DataColumnSpecCreator(columnNames[i], colType).createSpec());
          }

          if (addRowsFuture != null) {
            try {
              addRowsFuture.get();
            } catch (InterruptedException | ExecutionException e) {
              new RuntimeException("Error while adding rows to table.", e);
            }
          }

          // Convert values
          if (columns[i] == null || columns[i].length < rowsThisBatch) {
            // Only reallocate the DataCell buffer if insufficient size.
            columns[i] = new DataCell[rowsThisBatch];
          }

          if (column.isNull()) {
            Arrays.fill(columns[i], DataType.getMissingCell());
          } else {
            if (column.isList()) {
              final DataCell[] list = columns[i];
              int row = 0;
              for (final Object o : column.asList()) {
                final REXP rexp = (REXP) o;
                if (rexp.isNull()) {
                  list[row] = DataType.getMissingCell();
                } else {
                  if (rexp.isVector()) {
                    final REXPVector colValue = (REXPVector) rexp;
                    final DataCell[] listCells = new DataCell[colValue.length()];
                    importCells(colValue, listCells, nonNumbersAsMissing);
                    list[row] = CollectionCellFactory.createListCell(Arrays.asList(listCells));
                  } else {
                    LOGGER.warn(
                        "Expected Vector type for list cell. Inserting missing cell instead.");
                    list[row] = DataType.getMissingCell();
                  }
                }
                ++row;
              }
            } else {
              final DataCell[] columnCells = columns[i];
              Callable<Void> callable = ThreadUtils.callableWithContext(() -> {
                importCells(column, columnCells, nonNumbersAsMissing);
                return null;
              });
              futures[i] = threadPool != null ? threadPool.enqueue(callable)
                  : R_THREAD_POOL.submit(callable);
            }
          }
        }

        if (outSpec == null || cont == null) {
          // create container and outspec for the first batch of rows
          outSpec = new DataTableSpec(colSpecs.toArray(new DataColumnSpec[colSpecs.size()]));
          cont = exec.createDataContainer(outSpec);
        }

        Stream.of(futures).filter(o -> o != null).forEach(f -> {
          try {
            f.get();
          } catch (final Throwable e) {
            throw new RuntimeException("Error during conversion of R values to KNIME types.", e);
          }
        });

        final REXP rRowIds = (compactRowNames) ? null
            : getREngine().eval("knime.out.row.names[" + rowRangeExpr + "]");

        final DataCell[] curRow = new DataCell[numColumns];

        final BufferedDataContainer finalCont = cont;
        final int finalTransferredRows = transferredRows;
        final int finalRowsThisBatch = rowsThisBatch;
        // Should never happen, only happens if Rserve returns less bytes than expected. Maybe a
        // version issue?
        CheckUtils.checkState(compactRowNames || rRowIds != null,
            "Received an invalid packet from Rserve.");
        Callable<Void> addRowsCallable = ThreadUtils.callableWithContext(() -> {
          @SuppressWarnings("null")
          final String[] rowIds = compactRowNames ? null : rRowIds.asStrings();
          for (int i = 0; i < finalRowsThisBatch; ++i) {
            @SuppressWarnings("null")
            RowKey rowKey =
                compactRowNames ? new RowKey(Long.toString(1 + i + finalTransferredRows))
                    : new RowKey(rowIds[i]);
            for (int col = 0; col < columns.length; ++col) {
              curRow[col] = columns[col][i];
            }
            finalCont.addRowToTable(new DefaultRow(rowKey, curRow));
          }
          return null;
        });
        addRowsFuture = threadPool != null ? threadPool.enqueue(addRowsCallable)
            : R_THREAD_POOL.submit(addRowsCallable);

        transferredRows += rowsThisBatch;
      }

      if (addRowsFuture != null) {
        try {
          addRowsFuture.get();
        } catch (InterruptedException | ExecutionException e) {
          new RuntimeException("Error while adding rows to table.", e);
        }
      }

      if (outSpec == null || cont == null) {
        // create container and outspec for the first batch of rows
        outSpec = new DataTableSpec(colSpecs.toArray(new DataColumnSpec[colSpecs.size()]));
        cont = exec.createDataContainer(outSpec);
      }
      cont.close();

      return cont.getTable();
    } catch (final REXPMismatchException e) {
      throw new RException("Could not parse REXP.", e);
    } catch (final REngineException e) {
      throw new RException("Could not get value of " + varName + " from workspace.", e);
    }
  }

  /**
   * Import all cells from a R expression and put them into <code>column</code>.
   *
   * @param rexp Source of values
   * @param column ArrayList to store the created DataCells into.
   * @param nonNumbersAsMissing Convert NaN and Infinity to {@link MissingCell}.
   * @throws REXPMismatchException
   */
  private static final void importCells(final REXP rexp, final DataCell[] column,
      final boolean nonNumbersAsMissing) throws REXPMismatchException {
    if (rexp.isLogical()) {
      final byte[] bytes = rexp.asBytes();
      for (int i = 0; i < bytes.length; ++i) {
        final byte val = bytes[i];
        if (val == REXPLogical.TRUE) {
          column[i] = BooleanCell.TRUE;
        } else if (val == REXPLogical.FALSE) {
          column[i] = BooleanCell.FALSE;
        } else {
          column[i] = DataType.getMissingCell();
        }
      }
    } else if (rexp.isFactor()) {
      final RFactor strings = rexp.asFactor();
      for (int r = 0; r < strings.size(); ++r) {
        final String colValue = strings.at(r);
        column[r] = (colValue == null) ? DataType.getMissingCell() : new StringCell(colValue);
      }
    } else if (rexp.isInteger()) {
      final int[] ints = rexp.asIntegers();
      for (int i = 0; i < ints.length; ++i) {
        final int val = ints[i];
        column[i] = (val == REXPInteger.NA) ? DataType.getMissingCell() : new IntCell(val);
      }
    } else if (rexp.isNumeric()) {
      double[] doubles = rexp.asDoubles();
      for (int i = 0; i < doubles.length; ++i) {
        final double val = doubles[i];
        if (!REXPDouble.isNA(val)
            && !(nonNumbersAsMissing && (Double.isNaN(val) || Double.isInfinite(val)))) {
          /*
           * If R value is not NA (not available), missing cell will be exported instead. Also, if
           * nonNumbers should be exported as missing cells, NaN and Infinite will be exported as
           * missing cells instead, aswell.
           */
          column[i] = new DoubleCell(val);
        } else {
          column[i] = DataType.getMissingCell();
        }
      }
    } else {
      final String[] strings = rexp.asStrings();
      for (int i = 0; i < strings.length; ++i) {
        final String val = strings[i];
        column[i] = (val == null) ? DataType.getMissingCell() : new StringCell(val);
      }
    }
  }

  /**
   * Get cell type as which a REXP would be imported.
   */
  private static DataType importDataType(final REXP column) {
    if (column.isNull()) {
      return StringCell.TYPE;
    } else if (column.isLogical()) {
      return BooleanCell.TYPE;
    } else if (column.isFactor()) {
      return StringCell.TYPE;
    } else if (column.isInteger()) {
      return IntCell.TYPE;
    } else if (column.isNumeric()) {
      return DoubleCell.TYPE;
    } else {
      return StringCell.TYPE;
    }
  }

  // --- Workspace management ---

  @Override
  public void clearWorkspace(final ExecutionMonitor exec)
      throws RException, CanceledExecutionException {
    exec.setProgress(0.0, "Clearing previous workspace");
    final StringBuilder b = new StringBuilder();
    b.append("unloader <- function() {\n");
    b.append("  defaults = getOption(\"defaultPackages\")\n");
    b.append("  installed = (.packages())\n");
    b.append("  for (pkg in installed){\n");
    b.append("      if (!(as.character(pkg) %in% defaults)) {\n");
    b.append("          if(!(pkg == \"base\")){\n");
    b.append("              package_name = paste(\"package:\", as.character(pkg), sep=\"\")\n");
    b.append("              detach(package_name, character.only = TRUE)\n");
    b.append("          }\n");
    b.append("      }\n");
    b.append("  }\n");
    b.append("}\n");
    b.append("unloader();\n");
    b.append("rm(list = ls());"); // also includes the unloader function
    try {
      monitoredEval(b.toString(), exec, false);
    } catch (InterruptedException e) {
      throw new RException("Interrupted while loading R workspace.", e);
    }
    exec.setProgress(1.0, "Clearing previous workspace");
  }

  @Override
  public List<String> clearAndReadWorkspace(final File workspaceFile, final ExecutionMonitor exec)
      throws RException, CanceledExecutionException {
    exec.setProgress(0.0, "Clearing previous workspace");
    clearWorkspace(exec.createSubProgress(0.3));
    exec.setMessage("Loading workspace");
    try {
      String unixPath = FilenameUtils.separatorsToUnix(workspaceFile.getAbsolutePath());
      monitoredEval("load(\"" + unixPath + "\");", exec.createSubProgress(0.7), false);
    } catch (InterruptedException e) {
      throw new RException("Interrupted while loading R workspace.", e);
    }
    return importListOfLibrariesAndDelete();
  }

  @Override
  public List<String> importListOfLibrariesAndDelete() throws RException {
    try {
      final REXP listAsREXP = eval(R_LOADED_LIBRARIES_VARIABLE, true);
      eval("rm(" + R_LOADED_LIBRARIES_VARIABLE + ")", false);
      if (!listAsREXP.isVector()) {
        return Collections.emptyList();
      } else {
        return Arrays.asList(listAsREXP.asStrings());
      }
    } catch (REXPMismatchException e) {
      LOGGER.error("Rengine error: " + e.getMessage());
    }
    return Collections.emptyList();
  }

  /** Map of type to a R expression which creates a column vector for that type */
  private static final Map<Class<? extends DataValue>, String> DATA_TYPE_TO_R_CONSTRUCTOR;

  private static final AtomicInteger R_THREAD_POOL_INDEX = new AtomicInteger();

  private static final ExecutorService R_THREAD_POOL =
      new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(),
          r -> new Thread(r, "R-DataExchange-" + R_THREAD_POOL_INDEX.getAndIncrement()));

  static {
    Map<Class<? extends DataValue>, String> tmp = new HashMap<>();

    tmp.put(IntValue.class, "integer(rowCount)");
    tmp.put(BooleanValue.class, "logical(rowCount)");
    tmp.put(DoubleValue.class, "double(rowCount)");
    /* For character default value is "", which would create a "" level. */
    tmp.put(StringValue.class, "factor(as.character(rep(NA, rowCount)))");
    tmp.put(CollectionDataValue.class, "vector(mode='list', length=rowCount)");

    DATA_TYPE_TO_R_CONSTRUCTOR = Collections.unmodifiableMap(tmp);
  }

  /* This class ties together all variables concerning a single batch. */
  private static final class Batch {
    final int m_size;
    /** Size of this batch */
    int m_index = 0;
    /** Current row index for writing to */

    final RList m_rBatch; // List containing rColumns
    final REXPString m_rRowNames;
    final REXPGenericVector m_rVector;

    final Map<Integer, LinkedHashMap<String, Integer>> m_factorLevels = new HashMap<>();
    final Map<Integer, int[]> m_factorIndices = new HashMap<>();

    /**
     * @param numRows Number of rows for this batch.
     * @param columnCount Number of columns.
     */
    public Batch(final int numRows, final int columnCount) {
      m_size = numRows;

      m_rRowNames = new REXPString(new String[numRows]);
      m_rBatch = new RList(columnCount + 1, false);
      m_rVector = new REXPGenericVector(m_rBatch);
    }

    /**
     * Create REXPFactor instance from levels (String[]) and indices (int[]) for every factor
     * column.
     */
    public void postProcessFactorColumns() {
      for (Map.Entry<Integer, LinkedHashMap<String, Integer>> entry : m_factorLevels.entrySet()) {
        final int columnIndex = entry.getKey();
        final LinkedHashMap<String, Integer> factorLevels = entry.getValue();

        final String[] levels = factorLevels.keySet().stream().toArray(n -> new String[n]);
        final int[] indices = m_factorIndices.get(columnIndex);

        /* Create a REXP factor for this batch without copying the indices and levels */
        final REXPFactor factor = new REXPFactor(new RFactor(indices, levels, false, 1));
        m_rBatch.set(columnIndex, factor);
      }
    }
  }

  @Override
  public void monitoredAssign(final String name, final BufferedDataTable table,
      final ExecutionMonitor exec, final int batchSize, final String rType,
      final boolean sendRowNames) throws RException, CanceledExecutionException {

    final int rowCount = KnowsRowCountTable.checkRowCount(table.size());
    final int columnCount = table.getDataTableSpec().getNumColumns();

    if (columnCount == 0) {
      // Special case of empty table input. R doesn't seem to have "only row names" case, so we
      // just handle this as in 2.12, where this resulted in a empty data.frame without rows or
      // columns.
      eval(name + "<-data.frame()", false);
      exec.setProgress(1.0);
      return;
    }

    assign("rowCount", new REXPInteger(rowCount));
    assign("colCount", new REXPInteger(columnCount));

    @SuppressWarnings("unchecked")
    final Class<? extends DataValue>[] dataValueClasses = new Class[columnCount]; // type of each
                                                                                  // column

    /*
     * Allocate the memory for the columns on R side. They will be concatenated to a data.frame or
     * data.table after filled with input data (without copying the memory)
     */
    exec.setMessage("Allocating memory for R columns.");

    // Create cols variable (array of column vectors), will be coerced to data.frame later.
    eval("cols<-list(length=colCount)", false);

    // Script for removing temporary variables
    final StringBuilder cleanupScript =
        new StringBuilder("rm(knime.row.names,knime.col.names,bt,i,rowCount,colCount,cols");

    // script for combining the individual columns into a data.frame (or data.table)
    final boolean useDataTable = "data.table".equals(rType);
    if (useDataTable) {
      final REXP ret = eval("require('data.table')", true);
      try {
        if (!Boolean.parseBoolean(ret.asString())) {
          throw new RuntimeException(
              "Selected data.table as type for \"" + name + "\", but package could not be found.");
        }
      } catch (REXPMismatchException e) {
        throw new IllegalStateException("\"find.package\" doesn't return string anymore.", e);
      }
      LOGGER.debug("Using experimental support for sending data as \"data.table\".");
    }

    // Variables concerning a single batch
    final Batch batch = new Batch(Math.min(batchSize, rowCount), columnCount);

    // Get type of each column and generate R code which creates an appropriate column
    int columnIndex = 0;
    for (final DataColumnSpec columnSpec : table.getDataTableSpec()) {
      final String columnVar = "cols[[" + (columnIndex + 1) + "]]";

      final DataType type = columnSpec.getType();
      Class<? extends DataValue> dataValueClass;
      if (type.isCollectionType()) {
        dataValueClass = CollectionDataValue.class;
      } else {
        if (type.isCompatible(BooleanValue.class)) {
          dataValueClass = BooleanValue.class;
        } else if (type.isCompatible(IntValue.class)) {
          dataValueClass = IntValue.class;
        } else if (type.isCompatible(DoubleValue.class)) {
          dataValueClass = DoubleValue.class;
        } else {
          dataValueClass = StringValue.class;
        }
      }
      dataValueClasses[columnIndex] = dataValueClass;
      final String constructor = DATA_TYPE_TO_R_CONSTRUCTOR.get(dataValueClass);
      if (dataValueClass == CollectionDataValue.class) {
        eval(columnVar + "<-I(" + constructor + ")", false); // Allocate vector for column, e.g.:
                                                             // c10 <- double(12345)
      } else {
        eval(columnVar + "<-" + constructor, false); // Allocate vector for column, e.g.: c10 <-
                                                     // double(12345)
      }

      // Prepare KNIME side batch for this column
      if (dataValueClass == CollectionDataValue.class) {
        final RList col = new RList(batch.m_size, false);
        IntStream.range(0, batch.m_size).forEach(i -> col.add(null));
        batch.m_rBatch.add(new REXPGenericVector(col));
      } else if (dataValueClass == BooleanValue.class) {
        batch.m_rBatch.add(new REXPLogical(new byte[batch.m_size]));
      } else if (dataValueClass == IntValue.class) {
        batch.m_rBatch.add(new REXPInteger(new int[batch.m_size]));
      } else if (dataValueClass == DoubleValue.class) {
        batch.m_rBatch.add(new REXPDouble(new double[batch.m_size]));
      } else if (dataValueClass == StringValue.class) {
        /* Create index array and level map for factor columns */
        final int[] indices = new int[batch.m_size]; /* Will be reused every batch */
        final String[] levels = new String[0]; /* No levels yet and not reused */

        batch.m_factorIndices.put(columnIndex, indices);
        batch.m_factorLevels.put(columnIndex, new LinkedHashMap<String, Integer>());

        batch.m_rBatch.add(new REXPFactor(new RFactor(indices, levels, false, 1)));
      } else {
        batch.m_rBatch.add(new REXPString(new String[batch.m_size]));
      }

      columnIndex++;

      exec.checkCanceled(); // Useful for many many rows
    }

    if (sendRowNames) {
      // Allocate vector for row names on R side
      eval("knime.row.names<-character(rowCount)", false);
      cleanupScript.append(",knime.row.names");

      // And on KNIME side
      batch.m_rBatch.add(batch.m_rRowNames);
    }

    exec.setMessage("Sending column names.");
    // transfer column names to Rserve
    monitoredAssign("knime.col.names", new REXPString(table.getDataTableSpec().getColumnNames()),
        exec);

    if (useDataTable) {
      // Create data.table now, we will use set(table, column, row, newData) to set values in the
      // table directly
      try {
        monitoredEval("library(data.table);" + name + "<-as.data.table(cols, check.names=F);names("
            + name + ")<-knime.col.names", exec, false);
      } catch (InterruptedException e) {
        throw new RException("Interrupted while creating data.table and assigning column names.",
            e);
      }
    }

    /*
     * Send rows to R in batches
     */
    exec.setMessage("Sending rows to R.");

    final double numRows = table.size(); // for progress reporting only
    long rowIndex = 0;

    // variables for "Rows per second" debug output
    long timeSinceUpdate = System.currentTimeMillis();
    int rowsSinceUpdate = 0;

    for (final DataRow row : table) {

      // The following block prints the amount of rows sent every second for a rough estimate while
      // benchmarking
      if (System.currentTimeMillis() - timeSinceUpdate > 1000) {
        LOGGER.debugWithFormat("Rows per second: %d", rowsSinceUpdate);
        rowsSinceUpdate = 0;
        timeSinceUpdate = System.currentTimeMillis();
      } else {
        ++rowsSinceUpdate;
      }

      if (sendRowNames) {
        batch.m_rRowNames.asStrings()[batch.m_index] = row.getKey().getString();
      }

      // Assign the values of the current row to the batch
      int c = 0; // columnIndex
      for (final DataCell cell : row) {
        final Class<? extends DataValue> type = dataValueClasses[c];

        try {
          if (type == CollectionDataValue.class) {
            REXP value;
            // try get value from collection cell
            if (cell.isMissing()) {
              value = null;
            } else {
              final CollectionDataValue collValue = (CollectionDataValue) cell;
              final DataType elementType = cell.getType().getCollectionElementType();
              if (elementType.isCompatible(BooleanValue.class)) {
                final byte[] elementValue = new byte[collValue.size()];
                int i = 0;
                for (final DataCell e : collValue) {
                  elementValue[i] = exportBooleanValue(e);
                  ++i;
                }
                value = new REXPLogical(elementValue);

              } else if (elementType.isCompatible(IntValue.class)) {
                final int[] elementValue =
                    collValue.stream().mapToInt(RController::exportIntValue).toArray();
                value = new REXPInteger(elementValue);
              } else if (elementType.isCompatible(DoubleValue.class)) {
                final double[] elementValue =
                    collValue.stream().mapToDouble(RController::exportDoubleValue).toArray();
                value = new REXPDouble(elementValue);
              } else {
                final String[] elementValue =
                    collValue.stream().map(RController::exportStringValue).toArray(String[]::new);
                value = new REXPString(elementValue);
              }
            }
            ((REXP) batch.m_rBatch.get(c)).asList().set(batch.m_index, value);
          } else {
            final REXP curREXP = (REXP) batch.m_rBatch.get(c);
            if (type.equals(BooleanValue.class)) {
              curREXP.asBytes()[batch.m_index] = exportBooleanValue(cell);
            } else if (type.equals(IntValue.class)) {
              curREXP.asIntegers()[batch.m_index] = exportIntValue(cell);
            } else if (type.equals(DoubleValue.class)) {
              curREXP.asDoubles()[batch.m_index] = exportDoubleValue(cell);
            } else if (type.equals(StringValue.class)) {
              final LinkedHashMap<String, Integer> factors = batch.m_factorLevels.get(c);
              int index = REXPInteger.NA;
              if (!cell.isMissing()) {
                /* Get factor index for the value */
                final String value = ((StringValue) cell).getStringValue();
                final Integer idx = factors.get(value);
                if (idx == null) {
                  /* First occurance of this string value, add it to map of factor levels */
                  index = factors.size() + 1; // R indices are base 1
                  factors.put(value, index);
                } else {
                  index = idx;
                }
              }
              batch.m_factorIndices.get(c)[batch.m_index] = index;
            } else {
              curREXP.asStrings()[batch.m_index] = exportStringValue(cell);
            }
          }
        } catch (REXPMismatchException e) {
          // Will never happen, the REXPs types are added according to column types.
          throw new IllegalStateException(e);
        }
        ++c;
      }

      ++batch.m_index;
      if (batch.m_index == batch.m_size || batch.m_index + rowIndex == rowCount) {
        // Batch full or end of table

        batch.postProcessFactorColumns(); /* Create REXPFactor from int[] and level hash map */

        assign("bt", batch.m_rVector);

        final long start = rowIndex + 1;
        final long end = rowIndex + batch.m_index;

        /*
         * Assign data from chunk/batch to final table column-wise. If it's a factor column, we need
         * to grow the level set, otherwise values will end up as NA
         */
        if (useDataTable) {
          eval(
              "for(i in 1:colCount){if(is.factor(bt[[i]])){levels(" + name
                  + "[[i]])<-levels(bt[[i]])};set(" + name + "," + start + ":" + end + ",i,bt[i])}",
              false);
        } else {
          eval(
              "for(i in 1:colCount){if(is.factor(bt[[i]])){levels(cols[[i]])<-levels(bt[[i]])};cols[[i]]["
                  + start + ":" + end + "]<-bt[[i]][1:" + batch.m_index + "]}",
              false);
        }

        if (sendRowNames) {
          eval("knime.row.names[" + start + ":" + end + "]<-bt[[colCount+1]][1:" + batch.m_index
              + "]", false);
        }

        // Not relevant if batch.index+rowIndex == rowCount
        rowIndex += batch.m_size;
        batch.m_index = 0;

        exec.checkCanceled();
        exec.setProgress(rowIndex / numRows);
      }
    }

    try {
      if (useDataTable) {
        // Assign row names if sent
        if (sendRowNames) {
          monitoredEval("row.names(" + name + ")<-knime.row.names", exec, false);
        }
      } else {
        // Coerce columns to data.frame (rather than constructing a new one which would copy the
        // entire data)
        if (sendRowNames) {
          monitoredEval(
              name + "<-as.data.frame(cols,row.names=knime.row.names,check.names=F);names(" + name
                  + ")<-knime.col.names",
              exec, false);
        } else {
          monitoredEval(
              name + "<-as.data.frame(cols,check.names=F);names(" + name + ")<-knime.col.names",
              exec, false);
        }
      }
    } catch (InterruptedException e) {
      throw new RException("Interrupted while setting row names or creating data.frame.", e);
    }

    /* Clean up */
    exec.setMessage("Cleaning up.");

    eval(cleanupScript.append(")").toString(), false);

    exec.setProgress(1.0);
  }

  @Override
  public void saveWorkspace(final Path workspace, final ExecutionMonitor exec)
      throws RException, CanceledExecutionException {
    // save workspace to file
    try {
      String unixPath = FilenameUtils.separatorsToUnix(workspace.toString());
      monitoredEval("save.image(\"" + unixPath + "\");", exec, false);
    } catch (InterruptedException e) {
      throw new RException("Interrupted while saving R workspace.", e);
    }
  }

  /**
   * A function call that loads all libraries in the argument but checking if they are not loaded
   * yet.
   *
   * @param listOfLibraries List of libraries from upstream node (e.g. randomForest, tree, ...)
   * @param suppressMessages if true the library call is wrapped so that no output is printed
   * @return The command string to be run in R (ends with newline)
   */
  public static String createLoadLibraryFunctionCall(final List<String> listOfLibraries,
      final boolean suppressMessages) {
    final StringBuilder functionBuilder = new StringBuilder();
    functionBuilder.append("function(packages_to_install) {\n");
    functionBuilder.append("  for (pkg in packages_to_install) {\n");
    functionBuilder.append("      if(!(pkg %in% (.packages()))) {\n");
    if (suppressMessages) {
      functionBuilder.append("          suppressMessages(library(pkg, character.only = TRUE))\n");
    } else {
      functionBuilder.append("          library(pkg, character.only = TRUE)\n");
    }
    functionBuilder.append("      }\n");
    functionBuilder.append("  }\n");
    functionBuilder.append("}\n");
    final StringBuilder packageVector = new StringBuilder("c(");
    for (int i = 0; i < listOfLibraries.size(); i++) {
      packageVector.append(i == 0 ? "\"" : ", \"").append(listOfLibraries.get(i)).append("\"");
    }
    packageVector.append(")");
    return "sapply(" + packageVector + ", " + functionBuilder.toString() + ")\n";
  }

  // --- Monitored Evaluation helpers ---

  /**
   * Evaluation of R code with a monitor in a separate thread to cancel the code execution in case
   * the execution of the node is cancelled.
   */
  private final class MonitoredEval {

    private final int m_interval = 200;
    private final ExecutionMonitor m_exec;

    /**
     * Constructor
     *
     * @param exec for tracking progress and checking cancelled state.
     */
    public MonitoredEval(final ExecutionMonitor exec) {
      m_exec = exec;
    }

    /*
     * Run the Callable in a thread and make sure to cancel it, in case execution is cancelled.
     */
    private REXP monitor(final Callable<REXP> task)
        throws InterruptedException, RException, CanceledExecutionException {
      final FutureTask<REXP> runningTask = new FutureTask<>(task);
      final Thread t =
          (m_useNodeContext) ? ThreadUtils.threadWithContext(runningTask, "R-Evaluation")
              : new Thread(runningTask, "R-Evaluation");
      t.start();

      try {
        while (!runningTask.isDone()) {
          Thread.sleep(m_interval);
          m_exec.checkCanceled();
        }

        return runningTask.get();
      } catch (ExecutionException exception) {
        if (exception.getCause() instanceof RException) {
          throw (RException) exception.getCause();
        }
        throw new RException("Exception during R evaluation", exception);
      } finally {
        try {
          if (t.isAlive()) {
            t.interrupt();

            // The eval() call blocks somewhere in RTalk class,
            // where it waits for a socket. If we close that, we
            // should be able to force the interruption of our
            // evaluation thread.
            terminateAndRelaunch();
            // FIXME: Causes a "Socket closed" stack trace to be
            // printed. Should be cought instead, but needs to be
            // fixed in REngine first see
            // https://github.com/s-u/REngine/issues/6
          }
        } catch (final Exception e1) {
          LOGGER.warn("Could not terminate R correctly.");
        }
      }
    }

    /**
     * Run R code
     *
     * @param cmd The R command to run
     * @param resolve Whether to resolve the resulting reference
     * @return Result of the code (if resolve is true) or a reference to the result (if resolve is
     *         false)
     * @throws REngineException
     * @throws RException
     * @throws REXPMismatchException
     * @throws CanceledExecutionException when execution was cancelled
     * @throws InterruptedException
     */
    public REXP run(final String cmd, final boolean resolve) throws REngineException,
        REXPMismatchException, RException, CanceledExecutionException, InterruptedException {

      try {
        // wait for evaluation to complete
        return monitor(() -> {
          return eval(cmd, resolve);
        });
      } finally {
        // Make sure to recover in case user terminated or crashed our server
        checkConnectionAndRecover();
      }
    }

    /**
     * Monitored assignment of <code>value</code> to <code>symbol</code>.
     *
     * @param symbol
     * @param value
     * @throws REngineException
     * @throws REXPMismatchException
     * @throws CanceledExecutionException
     * @throws Exception
     */
    public void assign(final String symbol, final REXP value)
        throws REngineException, REXPMismatchException, CanceledExecutionException, Exception {
      try {
        // wait for evaluation to complete
        monitor(() -> {
          synchronized (getREngine()) {
            getREngine().assign(symbol, value);
          }
          return null;
        });
      } finally {
        // Make sure to recover in case user terminated or crashed our
        // server
        checkConnectionAndRecover();
      }
    }
  }

  @Override
  public Path getWorkingDirectory() throws RException, REXPMismatchException {

    // getwd returns NULL if the working directory is not avaible
    REXP rexp = eval("getwd", true); // may throw RException

    // throws REXPMismatchException if rexp value is NULL.
    String stringPath = rexp.asString();

    return Paths.get(stringPath);
  }

  @Override
  public void setWorkingDirectory(final Path workingDirectory) throws RException {

    String stringPath = FilenameUtils.separatorsToUnix(workingDirectory.toString());
    String cmd = "setwd('" + stringPath + "')";
    eval(cmd, false);
  }

  @Override
  public void addPackagePath(Path path) throws RException {
    String unixPath = FilenameUtils.separatorsToUnix(path.toString());
    String cmd = String.format(".libPaths(c('%s', .libPaths()))", unixPath);
    eval(cmd, false);
  }

  @Override
  public void restorePackagePath() throws RException, REXPMismatchException {
    // Remove custom path which is in the first position
    eval(".libPaths(.libPaths()[-1])", false);
  }
}