lib/MiddlewareServer.js

Summary

Maintainability
A
50 mins
Test Coverage
const check = require("check-types");
const debug = require("debug")("dbow:MiddlewareServer");
const log = require("debug")("bow:MiddlewareServer");
const deepEqual = require("deep-equal");
const deepFreeze = require("deep-freeze");
const uuid = require("uuid/v4");

const assert = require("./utils/assert");

const SECONDS_TO_MILLIS = 1000;
const MINUTES_TO_SECONDS = 60;
const DEFAULT_LOG_INTERVAL_IN_MINUTES = 5;

const DISCONNECT_MESSAGE_NAME = "__disconnect";

const cleanCriteria = (criteria) => Object
  .keys(criteria)
  .reduce((accumulated, criterionKey) => {
    const criterionValue = criteria[criterionKey];
    return {
      ...accumulated,
      [criterionKey]: check.array(criterionValue)
        ? [...new Set(criterionValue)]
        : criterionValue };
  }, {});

function scatterListener(listenerId) {
  const criteria = this.listenersById.get(listenerId).criteria;
  Object.keys(criteria).forEach((criterionKey) => {
    const criterionValue = criteria[criterionKey];
    const criterionValues = check.array(criterionValue) ? criterionValue : [criterionValue];
    if (!this.listenerIdsByCriterion.has(criterionKey)) {
      this.listenerIdsByCriterion.set(criterionKey, new Map());
    }
    const listenerIdsByCriterionValue = this.listenerIdsByCriterion.get(criterionKey);
    criterionValues.forEach((value) => {
      if (!listenerIdsByCriterionValue.has(value)) {
        listenerIdsByCriterionValue.set(value, new Set());
      }
      listenerIdsByCriterionValue.get(value).add(listenerId);
    });
  });
}

function clearScatteredListener(listenerId) {
  const criteria = this.listenersById.get(listenerId).criteria;
  Object.keys(criteria).forEach((criterionKey) => {
    const criterionValue = criteria[criterionKey];
    const criterionValues = check.array(criterionValue) ? criterionValue : [criterionValue];
    const listenerIdsByCriterionValue = this.listenerIdsByCriterion.get(criterionKey);
    criterionValues.forEach((value) => {
      const listenerIds = listenerIdsByCriterionValue.get(value);
      listenerIds.delete(listenerId);
      if (0 === listenerIds.size) {
        listenerIdsByCriterionValue.delete(value);
      }
    });
    if (0 === listenerIdsByCriterionValue.size) {
      this.listenerIdsByCriterion.delete(criterionKey);
    }
  });
}

function findListenerIdsByPredicate(predicateKey, predicateValue) {
  let listenerIds = new Set();
  if (this.listenerIdsByCriterion.has(predicateKey)) {
    const listenerIdsByCriterionValue = this.listenerIdsByCriterion.get(predicateKey);
    if (listenerIdsByCriterionValue.has(predicateValue)) {
      listenerIds = listenerIdsByCriterionValue.get(predicateValue);
    }
  }
  return listenerIds;
}

const createConjunctionForPredicate = (query) => function (queryListenerIds, predicateKey, i) {
  let listenerIds = queryListenerIds;
  if (0 === i || 0 < queryListenerIds.size) {
    const predicateListenerIds = findListenerIdsByPredicate.call(this, predicateKey, query[predicateKey]);
    listenerIds = 0 === i
      ? predicateListenerIds
      : new Set([...queryListenerIds].filter((listenerId) => predicateListenerIds.has(listenerId)));
  }
  return listenerIds;
};

function createDisjunctionForQuery(audienceListenerIds, query) {
  return new Set([
    ...audienceListenerIds,
    ...Object.keys(query).reduce(createConjunctionForPredicate(query).bind(this), new Set())
  ]);
}

function refreshListener(id, criteria) {
  const listenerExists = this.listenersById.has(id);
  if (listenerExists) {
    const existingListener = this.listenersById.get(id);
    if (!deepEqual(existingListener.criteria, criteria)) {
      clearScatteredListener.call(this, id);
      existingListener.criteria = criteria;
      scatterListener.call(this, id);
    }
  }
  return listenerExists;
}

async function connectToPubSub() {
  if (this.pubSubBuilder.isOperational()) {
    log("Connecting Middleware v%s/%s to PubSub...", this.version, this.uuid);
    this.pubSub = await this.pubSubBuilder.build(`BOW_LISTENER_${this.version}`);
    this.pubSub.onMessage(({ senderUuid, id, criteria }) => {
      if (senderUuid !== this.uuid) {
        refreshListener.call(this, id, criteria);
      }
    });
    this.shareListenerCriteria = (id, criteria) =>
      this.pubSub.pushMessage({ senderUuid: this.uuid, id, criteria });
  }
}

async function disconnectFromPubSub() {
  if (check.assigned(this.pubSub)) {
    log("Disconnecting Middleware v%s/%s from PubSub...", this.version, this.uuid);
    await this.pubSub.destroy();
  }
}

const emit = (name, payload) => (socket) => {
  if (DISCONNECT_MESSAGE_NAME === name) {
    socket.disconnect();
  } else {
    socket.emit(name, payload);
  }
};

class Middleware {

  constructor(middleware, config, pubSubBuilder) {
    this.uuid = uuid();
    this.version = middleware.version;
    this.createCriteriaFromListenerDetails = middleware.createCriteriaFromListenerDetails;
    this.config = config;
    this.pubSubBuilder = pubSubBuilder;
    this.pubSub = undefined;
    this.logInterval = undefined;
    this.listenersById = new Map();
    this.listenerIdsByCriterion = new Map();
    this.shareListenerCriteria = () => {}; // eslint-disable-line no-empty-function
    Object.seal(this);
  }

  async start() {
    log("Starting Middleware v%s/%s...", this.version, this.uuid);
    await connectToPubSub.call(this);
    if (check.positive(this.config.logInterval)) {
      this.logInterval = setInterval(() => {
        log("Middleware v%s/%s currently manages %s listener(s)", this.version, this.uuid, this.listenersById.size);
      }, this.config.logInterval * SECONDS_TO_MILLIS);
    }
  }

  async register(socket) {
    const criteria = await this.createCriteriaFromListenerDetails(socket.listenerDetails);
    assert.criteria(criteria);
    const identifiedCriteria = deepFreeze(cleanCriteria({ ...criteria, __id: socket.listenerDetails.id }));
    debug("Criteria created: %s", JSON.stringify(identifiedCriteria));
    if (socket.connected) {
      const listenerId = socket.listenerDetails.id;
      const listenerExists = refreshListener.call(this, listenerId, identifiedCriteria);
      if (!listenerExists) {
        this.listenersById.set(listenerId, { criteria: identifiedCriteria, sockets: new Set() });
        scatterListener.call(this, listenerId);
      }
      this.listenersById.get(listenerId).sockets.add(socket);
      this.shareListenerCriteria(listenerId, identifiedCriteria);
    }
  }

  remove(socket) {
    if (check.object(socket.listenerDetails)) {
      const listenerId = socket.listenerDetails.id;
      if (this.listenersById.has(listenerId)) {
        const listener = this.listenersById.get(listenerId);
        listener.sockets.delete(socket);
        if (0 === listener.sockets.size) {
          clearScatteredListener.call(this, listenerId);
          this.listenersById.delete(listenerId);
        }
      }
    }
  }

  forward(name, payload, audience) {
    [...audience.reduce(createDisjunctionForQuery.bind(this), new Set())]
      .map((listenerId) => this.listenersById.get(listenerId))
      .reduce((sockets, listener) => new Set([...sockets, ...listener.sockets]), new Set())
      .forEach(emit(name, payload));
  }

  async stop() {
    log("Stopping Middleware v%s/%s...", this.version, this.uuid);
    const listenerCount = this.listenersById.size;
    await disconnectFromPubSub.call(this);
    this.listenersById.clear();
    this.listenerIdsByCriterion.clear();
    if (check.assigned(this.logInterval)) {
      clearInterval(this.logInterval);
      this.logInterval = undefined;
    }
    return listenerCount;
  }

}

const cleanConfig = (config) => ({
  logInterval: check.assigned(config.logInterval)
    ? config.logInterval
    : DEFAULT_LOG_INTERVAL_IN_MINUTES * MINUTES_TO_SECONDS
});

function forEachMiddleware(action) {
  return Promise.all([...this.middlewaresByVersion.values()]
    .map((middleware) => middleware[action]()));
}

module.exports = class MiddlewareServer {

  constructor(middlewares, config, pubSubBuilder) {
    const cleanedConfig = cleanConfig(config);
    this.middlewaresByVersion = new Map();
    middlewares.forEach((middleware) =>
      this.middlewaresByVersion.set(middleware.version,
        new Middleware(middleware, cleanedConfig, pubSubBuilder)));
    Object.seal(this);
  }

  async start() {
    log("Starting MiddlewareServer...");
    await forEachMiddleware.call(this, "start");
  }

  async register(socket) {
    await this.middlewaresByVersion
      .get(socket.outbound.middlewareVersion)
      .register(socket);
    debug("Listener registered: %s", socket.listenerDetails.id.toString());
  }

  remove(socket) {
    this.middlewaresByVersion
      .get(socket.outbound.middlewareVersion)
      .remove(socket);
    debug("Listener removed: %s", socket.listenerDetails.id.toString());
  }

  forward(version, name, payload, audience) {
    this.middlewaresByVersion
      .get(version)
      .forward(name, payload, audience);
    debug("Message forwarded: %s", JSON.stringify({ name, payload, audience }));
  }

  async stop() {
    log("Stopping MiddlewareServer...");
    const listenerCounts = await forEachMiddleware.call(this, "stop");
    this.middlewaresByVersion.clear();
    return listenerCounts.reduce((total, listenerCount) => total + listenerCount, 0);
  }

};