meteor/meteor

View on GitHub
packages/inter-process-messaging/inter-process-messaging.js

Summary

Maintainability
C
1 day
Test Coverage
const uuid = require("uuid");

const { encode, decode } = require("arson");

const {
  MESSAGE,
  RESPONSE,
  PING,
  PONG,
} = require("./types.js");

const hasOwn = Object.prototype.hasOwnProperty;

Object.assign(exports, {
  // Adds onMessage(topic, callback) and sendMessage(topic, payload)
  // methods to otherProcess. These methods are an improvement over the
  // native Node interfaces otherProcess.on("message", callback) and
  // otherProcess.send(message) because they take a topic string as their
  // first argument, which allows restricting the delivery of messages by
  // topic; and they permit the receiving process to respond by returning
  // a value (possibly a Promise) from the onMessage callback.
  enable(otherProcess) {
    if (typeof otherProcess.onMessage === "function" &&
        typeof otherProcess.sendMessage === "function") {
      // Calling enable more than once should be safe/idempotent.
      return otherProcess;
    }

    const callbacksByTopic = new Map;

    // To receive messages *from* otherProcess, this process should call
    // otherMessage.onMessage(topic, callback). The callback will receive
    // the provided payload as its first (and only) parameter. Callbacks
    // may return a Promise, in which case the response will be delayed
    // until all results returned by callbacks registered for this topic
    // have been resolved.
    otherProcess.onMessage = function onMessage(topic, callback) {
      if (! callbacksByTopic.has(topic)) {
        callbacksByTopic.set(topic, new Set);
      }
      callbacksByTopic.get(topic).add(callback);
    };

    const readyResolvers = new Map;
    const pendingMessages = new Map;
    const promisesByTopic = new Map;
    const handlersByType = Object.create(null);

    function gracefulErrorHandler(error) {
      // EPIPE occurs when sending fails because the other process has
      // exited already, which can be safely ignored in most cases.
      if (error && error.code !== "EPIPE") {
        console.error("Error sending message:", error);
      }
    }

    handlersByType[PING] = function ({ id }) {
      otherProcess.send({ type: PONG, id }, gracefulErrorHandler);
    };

    handlersByType[PONG] = function ({ id }) {
      const resolve = readyResolvers.get(id);
      if (typeof resolve === "function") {
        readyResolvers.delete(id);
        // This resolves the child.readyForMessages Promise created above.
        resolve();
      }
    };

    handlersByType[MESSAGE] = function ({
      responseId,
      topic,
      encodedPayload,
    }) {
      const newPromise = (
        promisesByTopic.get(topic) || Promise.resolve()
      ).then(() => {
        const results = [];
        const callbacks = callbacksByTopic.get(topic);
        if (callbacks && callbacks.size > 0) {
          // Re-decode the payload for each callback to prevent one
          // callback from modifying the payload seen by later callbacks.
          callbacks.forEach(cb => results.push(cb(decode(encodedPayload))));
          return Promise.all(results);
        }
        // Since there were no callbacks, this will be an empty array.
        return results;
      }).then(results => {
        if (responseId) {
          otherProcess.send({
            type: RESPONSE,
            responseId,
            encodedResults: encode(results),
          }, gracefulErrorHandler);
        }
      }, error => {
        const serializable = {};

        // Use Reflect.ownKeys to catch non-enumerable properties, since
        // every Error property (including "message") seems to be
        // non-enumerable by default.
        Reflect.ownKeys(error).forEach(key => {
          serializable[key] = error[key];
        });

        otherProcess.send({
          type: RESPONSE,
          responseId,
          encodedError: encode(serializable),
        }, gracefulErrorHandler);
      });

      // Immediately update the latest promise for this topic to the
      // newPromise that we just created, before any listeners run. This
      // strategy has the effect of chaining promises by topic and thus
      // keeping messages and their responses strictly ordered, one after
      // the last. Because we always register a non-throwing error handler
      // at the end of newPromise, this queue of promises should never get
      // stalled by an earlier rejection.
      promisesByTopic.set(topic, newPromise);
    };

    handlersByType[RESPONSE] = function (message) {
      const entry = pendingMessages.get(message.responseId);
      if (entry) {
        if (hasOwn.call(message, "encodedError")) {
          entry.reject(decode(message.encodedError));
        } else {
          entry.resolve(decode(message.encodedResults));
        }
      }
    };

    otherProcess.on("message", message => {
      const handler = handlersByType[message.type];
      if (typeof handler === "function") {
        handler(message);
      }
    });

    // Call otherProcess.sendMessage(topic, payload) instead of the native
    // otherProcess.send(message) to deliver a message based on a specific
    // topic string, and to receive a reliable response when the other
    // process has finished handling that message.
    otherProcess.sendMessage = function sendMessage(topic, payload) {
      otherProcess.readyForMessages =
        otherProcess.readyForMessages || makeReadyPromise();

      return otherProcess.readyForMessages.then(() => {
        const responseId = uuid();

        return new Promise((resolve, reject) => {
          pendingMessages.set(responseId, { resolve, reject });

          otherProcess.send({
            type: MESSAGE,
            responseId,
            topic,
            encodedPayload: encode(payload),
          }, error => {
            if (error) {
              reject(error);
            }
          });

        }).then(response => {
          pendingMessages.delete(responseId);
          return response;

        }, error => {
          pendingMessages.delete(responseId);
          throw error;
        });
      });
    };

    function makeReadyPromise() {
      return new Promise((resolve, reject) => {
        const pingMessage = { type: PING, id: uuid() };
        const backoff_factor = 1.1;
        let delay_ms = 50;

        readyResolvers.set(pingMessage.id, resolve);

        function poll() {
          if (readyResolvers.has(pingMessage.id)) {
            otherProcess.send(pingMessage, error => {
              if (error) {
                reject(error);
              } else {
                setTimeout(poll, delay_ms);
                delay_ms *= backoff_factor;
              }
            });
          }
        }

        poll();
      });
    }

    otherProcess.on("exit", (code, signal) => {
      const error = new Error("process exited");
      Object.assign(error, { code, signal });

      // Terminate any pending messages.
      pendingMessages.forEach(entry => entry.reject(error));

      // Prevent future messages from being sent.
      otherProcess.readyForMessages = Promise.reject(error);

      // Silence UnhandledPromiseRejectionWarning
      otherProcess.readyForMessages.catch(() => {});
    });

    return otherProcess;
  },

  // Call this onMessage function to listen for messages *from the parent
  // process* (if the parent spawned this process with an IPC channel).
  onMessage(topic, callback) {
    // Do nothing by default unless exports.enable(process) is called
    // below, because this process will never receive any messages unless
    // we have an IPC channel open with the parent process, which is true
    // only if process.send is a function.
  }
});

if (typeof process.send === "function") {
  // The process.send method is defined only when the current process was
  // spawned with an IPC channel by the parent process. In other words,
  // given that process.send can be used to send messages to the parent
  // process, it makes sense to enable process.sendMessage(topic, payload)
  // in the child-to-parent direction, too.
  exports.enable(process);

  // Override the default no-op exports.onMessage defined above.
  exports.onMessage = process.onMessage;
}