solsort/direape

View on GitHub
direape.js

Summary

Maintainability
D
2 days
Test Coverage
// <img src=https://direape.solsort.com/icon.png width=96 height=96 align=right>
//
// [![website](https://img.shields.io/badge/website-direape.solsort.com-blue.svg)](https://direape.solsort.com/)
// [![github](https://img.shields.io/badge/github-solsort/direape-blue.svg)](https://github.com/solsort/direape)
// [![codeclimate](https://img.shields.io/codeclimate/github/solsort/direape.svg)](https://codeclimate.com/github/solsort/direape)
// [![travis](https://img.shields.io/travis/solsort/direape.svg)](https://travis-ci.org/solsort/direape)
// [![npm](https://img.shields.io/npm/v/direape.svg)](https://www.npmjs.com/package/direape)
//
// # DireApe - Distributed App Environment
//
// Direape supplies:
//
// - message passing, locally and across network
// - creation/destruction of worker threads
// - simple nodejs message relay server
// - unit testing library
// - utility library
//
// Read latest documentation on [AppEdit](https://appedit.solsort.com/?page=read&github=solsort/direape).
//
// It is fully self-contained and is the foundation for [REUN](https://appedit.solsort.com/?page=read&github=solsort/reun).

(function() {
  var da; setupModule();

  da.info = {
    name: 'direape',
    keywords: ['message passing', 'unit testing'],
    platforms: ['nodejs', 'webworker', 'browser'],
    github: 'solsort/direape'
  };

  // ## Message Passing
  //
  // ### `pid`, `nid`
  //
  // `direape.pid` is id of the current process. `direape.nid` is the id of the main process on this computer/node.
  //
  // In worker processes, these are set by the parent thread.
  //
  // Some of the code below is in progress. 
  // Cryptographic ids will be used when it becomes more distributed.

  function initPid() {
    if(!da.pid) {
      if(!self.crypto && isNodeJs()) {
        da._publicKey = Math.random().toString(); // TODO
        da.pid = require('crypto')
          .createHash('sha256')
          .update(da._publicKey, 'latin1')
          .digest('base64');
      } else {
        return da.pid || Promise.resolve()
          .then(() => self.crypto.subtle ||
              (self.crypto.subtle = self.crypto.webkitSubtle))
          .then(() => self.crypto.subtle.generateKey(
                {name: 'ECDSA', namedCurve: 'P-521'},
                true, ['sign', 'verify']))
          .then(key => self.crypto.subtle.exportKey('spki', key.publicKey))
          .then(spki => da._publicKey = spki)
          .then(buf => self.crypto.subtle.digest('SHA-256', buf))
          .then(buf => btoa(da.buf2ascii(buf)))
          .then(base64 => da.pid = da.nid = base64)
          .catch(e => {
            da.pid = da.nid = 'local';
          });
      }
    }
  }

  // ### `handle(name, fn, opt)`
  //
  // Register a new message handler in the current thread, 
  // given the name of the mailbox. 
  // The handler can be called across the network if `opt` is `{"public": true}`.
  //
  // The handler function `fn`, gets called when a message arrives, 
  // with the parameters passed in the message.

  da._handlers = da._handlers || new Map();
  da.handle = (name, fn, opt) => {
    if(!fn) {
      da._handlers.delete(name);
    }
    da._handlers.set(name, Object.assign(opt || {}, {fn:fn}));
  };

  // ### `call(pid, name, args...)`
  //
  // Call a (remote) function, given the process id, and the handler name. 
  // Will return a promise of a result (that will time out if no result arrive.
  //
  da.call = function(pid, name) {
    return new Promise((resolve, reject) =>
        send({
          dstPid: pid,
          dstName: name,
          srcPid: da.pid,
          srcName: makeCallbackHandler(resolve, reject),
          data: da.slice(arguments, 2)
        }));
  };

  // ### `emit(pid, name, args...)`
  //
  // Emit a message to a remote handler, - similar to `call`, 
  // except that it will not send a result back.

  da.emit = function(pid, name) {
    send({
      dstPid: pid,
      dstName: name,
      data: da.slice(arguments, 2)
    });
  };

  // ### Implementation details
  //
  da._messageQueue = [];
  var postmanScheduled = false;
  var callTimeout = 10000;

  function makeCallbackHandler(resolve, reject) {
    var name = 'callback' + Math.random().toString().slice(2);
    var timeout = setTimeout(
        () => handler(null, 'call timeout'),
        callTimeout);

    function handler(result, error) {
      clearTimeout(timeout);
      da.handle(name, null);
      if(error) {
        reject(error);
      } else {
        resolve(result);
      }
    }

    da.handle(name, handler, {public: true});
    return name;
  }

  function send(msg) {
    da._messageQueue.push(msg);
    schedulePostman();
  }

  function schedulePostman() {
    if(!postmanScheduled) {
      da.nextTick(postman);
      postmanScheduled = true;
    }
  }

  function postman() {
    postmanScheduled = false;
    var messages = da._messageQueue;
    da._messageQueue = [];
    for(var i = 0; i < messages.length; ++i) {
      processMessage(messages[i]);
    }
  }

  function processMessage(msg) {
    if(msg.dstPid === da.pid) {
      processLocalMessage(msg);
    } else {
      relay(msg);
    }
  }

  function processLocalMessage(msg) {
    var result;
    var handler = da._handlers.get(msg.dstName) || {};

    if(msg.external && ! handler.public) {
      return sendReply(msg, [null, 'no such public function']);
    }

    var fn = handler.fn;
    if(!fn) {
      console.log('no such function:', msg.dstName);
      return sendReply(msg, [null, 'no such function']);
    }

    Promise.resolve(fn.apply(msg, msg.data))
      .then(result => sendReply(msg, [result]))
      .catch(e => sendReply(msg, [null, e]));
  }

  function sendReply(msg, data) {
    if(msg.srcPid !== undefined && msg.srcName !== undefined) {
      send({
        dstPid: msg.srcPid,
        dstName: msg.srcName,
        data: data
      });
    }
  }

  da.test('message passing', () => {
    da.handle('da:square', i => i*i);
    return da.call(da.pid, 'da:square', 9)
      .then(i => da.assertEquals(i, 81));
  });

  // ## Workers
  //
  // Only available in browser main thread.
  //
  // ### `spawn()`
  //
  // Create a new worker thread.
  //
  // Returns a promise of the process id of the new thread. 
  // Code can then be loaded using [reun](https://appedit.solsort.com/?page=read&github=reun),
  // which exposes `reun:require` and `reun:eval`.

  if(isBrowser()) {
    da._children = da._children || new Map();

    da.spawn = () => new Promise((resolve, reject) => {
      loadWorkerSource().then(workerSource => {

        var childPid = da.pid + Math.random().toString(36).slice(2,12);
        workerSource = `
          self.direape = {
            pid: '${childPid}',
            nid: '${da.pid}'
          };
        ` + workerSource;

        var workerSourceUrl = URL.createObjectURL(
            new Blob([workerSource], {type:'application/javascript'}));

        var child = new self.Worker(workerSourceUrl);

        da._children.set(childPid, child);

        child.onmessage = (o) => {
          child.onmessage = (o) => send(o.data);
          URL.revokeObjectURL(workerSourceUrl);
          resolve(childPid);
        };
      });
    });
  }

  // ### `isWorker()`
  //
  // Check whether the current process is a worker thread.

  da.isWorker = isWorker;
  function isWorker() {
    return !!self.postMessage && self.postMessage.length === 1;
  }

  // ### `children()`
  //
  // Get the list of running worker processes.

  if(isBrowser()) {
    da.children = () => da._children.keys();
  }

  // ### `kill(child-id)`
  //
  // Kill a worker process.

  if(isBrowser()) {
    da.kill = (pid) => {
      da._children.get(pid).terminate();
      da._children.delete(pid);
    };
  }

  // ### Implementation details
  //
  // Send the message to ther processes. Only called if it shouldn't be handled by the process itself;

  function relay(msg) {
    if(isWorker()) {
      self.postMessage(msg);
    } else if(isBrowser()) {
      var child = da._children.get(msg.dstPid);
      if(child) {
        child.postMessage(msg);
      } else {
        relayNetwork(msg);
      }
    } else {
      relayNetwork(msg);
    }
  }

  function initWorker() {
    if(isWorker()) {
      self.onmessage = (o) => send(o.data);
      self.postMessage({});
    }
  }

  function loadWorkerSource() {
    if(!da._workerSourcePromise) {
      var source;
      var reunRequest = da.GET('https://unpkg.com/reun@0.2');
      da._workerSourcePromise = da.GET(isDev() ? './direape.js' : 'https://unpkg.com/direape@0.2')
        .then(src => source = src)
        .then(() => reunRequest)
        .then(reun => source + reun);

    }
    return da._workerSourcePromise;
  }

  function isDev() {
    return isNodeJs() ? process.env.DIREAPE_DEV : self.DIREAPE_DEV;
  }

  if(isBrowser()) {
    da.test('workers', () => {
      var childPid;
      return da.spawn()
        .then(pid => childPid = pid)
        .then(() => da.call(childPid, 'da:status'))
        .then(o => da.assertEquals(o.pid, childPid));
    });
  }

  // ## Network
  //
  // Message passing between servers, - is implemented,
  // and should work, but not used for production yet.
  //
  // ### `online([boolean/url])`
  //
  // TODO: automatic reconnect

  if(isBrowser()) {

    var websocket;

    da.online = function(url) {
      if(arguments.length === 0) {
        return websocket && websocket.readyState === 1;
      }

      closeWebSocket();

      if(url) {
        if(typeof url !== 'string') {
          url = 'wss://direape.solsort.com';
        }
        return new Promise((resolve, reject) => {
          websocket = new WebSocket(url);
          websocket.onopen = () => {
            websocket.send(JSON.stringify({direapeConnect: da.buf2ascii(da._publicKey)}));
            resolve(true);
          };
          websocket.onerror = (e) => {
            da.emit(da.pid, 'da:socket-error', e);
            reject(e);
          };
          websocket.onmessage = o => {
            var msg = o.data;
            msg = JSON.parse(msg);
            msg.external = msg.external || true;
            send(msg);
          };
        });
      }
    };
  }

  // ### Implementation details
  //
  // Send the message to ther processes. Only called if it shouldn't be handled by the process itself;

  if(isNodeJs()) {
    var wsClients = new Map();
  }

  function closeWebSocket() {
    if(websocket) {
      websocket.onerror = () => null;
      websocket.close();
    }
  }

  function relayNetwork(msg) {
    if(isNodeJs()) {
      var dst = wsClients.get((msg.dstPid || '').slice(0,44));
      console.log('relay', msg, wsClients.keys(), !!dst);
      if(dst) {
        dst.send(JSON.stringify(msg));
      }
    } else {
      if(websocket) {
        websocket.send(JSON.stringify(msg));
      }
    }
  }

  // ### Server: nodejs websocket server that relays messages

  if(isNodeJs()) {
    da.startServer = () => {
      var app = require('express')();
      app.use(require('express').static(__dirname));

      var server = require('http').createServer(app);

      var wss = new (require('ws').Server)({
        perMessageDeflate: false,
        server: server
      });

      wss.on('connection', (ws) => {
        var nid;
        ws.on('message', (msg) => {
          msg = JSON.parse(msg);
          if(msg.direapeConnect) {
            if(nid) {
              wsClients.delete(nid);
            }
            nid = require('crypto')
              .createHash('sha256')
              .update(msg.direapeConnect, 'latin1')
              .digest('base64');
            wsClients.set(nid, ws);
          } else {
            msg.external = nid;
            if(msg.dstPid === 'server') {
              msg.dstPid = da.pid;
            }
            send(msg);
          }
        });
        ws.on('close', () => {
          if(nid) {
            wsClients.delete(nid);
          }
        });
      });

      server.listen(8888, () => console.log('started server on port 8888'));
    };
  }

  // ## Built-in Handlers
  //
  // These are the event handlers. that are exposed

  function initHandlers() {

    // ### `da:list-clients ()`
    //
    // List clients connected to the NodeJS message relay server.

    if(isNodeJs()) {
      da.handle('da:list-clients', () => Array.from(wsClients.keys()), {public: true});
    }

    // ### `da:get (url)`
    //
    // Get the content of a URL. This is needed by the module loader:
    // WebWorkers may not be able to load the modules due to the web security model,
    // but they can they are able to request them through the main thread.

    da.handle('da:GET', da.GET);

    // ### `da:status ()`
    //
    // Get a status with a timestamp. 
    // Can be used to ping for latency and checking whether a thread is alive.

    da.handle('da:status', () => ({pid: da.pid, time: Date.now()}), {public: true});

    // ### `da:children ()`

    if(!isBrowser()) {
      da.handle('da:children', da.children);
    }
  }
  // ## Utilities
  //
  // Various utilities needed by DireApe, and made available 
  // as other libraries/code are also using them
  //
  // ### `ready(fn)`
  //
  // Called when direape is loaded, and has a valid `pid`.
  //
  da._waiting = da._waiting || [];
  da.ready = (fn) => {
    if(da._waiting) {
      da._waiting.push(fn);
    } else {
      da.nextTick(fn);
    }
  };

  // ### `isNodeJS()`, `isBrowser()`
  //
  // Determine which environment we are in, see also `isWorker`, under Workers above.
  //
  da.isNodeJs = isNodeJs;
  function isNodeJs() {
    return !!((self.process || {}).versions || {}).node;
  }

  da.isBrowser = isBrowser;
  function isBrowser() {
    return !!(self.window);
  }

  // ### `da.log(...)` `da.trace(...)`
  //
  // Simple logging, used for debug.
  //
  da.log = function(msg, o) {
    console.log.apply(console, arguments);
    return o;
  };
  da.trace = da.log;
  //
  // ### `ajax` `GET(url)`
  //
  // We need to export http-get from the main thread,
  // to be able to fetch modules with webworkers.
  //
  // May as well make a generic `ajax` function,
  // which is not much extra code, and is needed by other code.
  //
  // (`fetch` is not universally supported yet )

  if(self.XMLHttpRequest) {

    da.ajax = (url, opt) =>
      new Promise((resolve, reject) => {
        opt = opt || {};
        if(opt.json) {
          opt.data = JSON.stringify(opt.data);
        }
        opt.method = opt.method || (opt.data ? 'POST' : 'GET');
        var xhr = new XMLHttpRequest();
        xhr.open(opt.method, url);
        xhr.onreadystatechange = function() {
          if(xhr.readyState === 4) {
            if((200 <= xhr.status && xhr.status < 300)
                && typeof xhr.responseText === 'string') {
              resolve(xhr.responseText);
            } else {
              reject(xhr);
            }
          }
        };
        xhr.send(opt.data ? opt.data : undefined);
      });
    da.GET = da.ajax;

  } else {

    da.GET = (url) => new Promise((resolve, reject) => {
      if(url[0] === '.') {
        resolve(require('fs').readFileSync(url));
      } else {
        return require('request')(url, (err, res, body) =>
            err || res.statusCode !== 200 ? reject(err) : resolve(body));
      }
    });
  }
  da.test('GET ok', () => da.GET('https://unpkg.com/direape'));
  da.test('GET fail', () => da.GET('https://unpkg.com/direape/notfound')
      .catch(() => 'error')
      .then(ok => da.assertEquals('error', ok)));

  // ### `jsonify(obj)`
  //
  // Translate many general JavaScript objects into `JSON.stringify`-able objects. 
  // 
  // Used for preprocesing data that will be passed as messages, 
  // if they may contain data of classes that would mess up the serialisation.

  da.jsonify = o =>
    JSON.parse(JSON.stringify([o], (k,v) => jsonReplacer(v)))[0];

  da.test('jsonify', () => {
    var e = new Error('argh');
    e.stack = 'hello';

    da.assertEquals(da.jsonify(e), {
      $_class: 'Error',
      name:'Error',
      message:'argh',
      stack: 'hello'
    });

    da.assertEquals(da.jsonify(function hello() { }), {
      $_class: 'Function',
      name: 'hello'
    });

    da.assertEquals(da.jsonify(null), null);
  });

  function jsonReplacer(o) {
    var jsonifyWhitelist = ['stack', 'name', 'message', 'id', 'class', 'value'];

    if((typeof o !== 'object' && typeof o !== 'function') ||
        o === null || Array.isArray(o) || o.constructor === Object) {
      return o;
    }
    var result, k, i;
    if(typeof o.length === 'number') {
      result = [];
      for(i = 0; i < o.length; ++i) {
        result[i] = o[i];
      }
    }
    result = Object.assign({}, o);
    if(o.constructor && o.constructor.name && result.$_class === undefined) {
      result.$_class = o.constructor.name;
    }
    if(o instanceof ArrayBuffer) {
      //
      // TODO btoa does not work in arraybuffer,
      // and apply is probably slow.
      // Also handle Actual typed arrays,
      // in if above.
      //
      result.base64 = self.btoa(
          String.fromCharCode.apply(null, new Uint8Array(o)));
    }
    for(i = 0; i < jsonifyWhitelist.length; ++i) {
      k = jsonifyWhitelist[i] ;
      if(o[k] !== undefined) {
        result[k] = o[k];
      }
    }
    return result;
  }

  // ### `nextTick(fn)`

  da.nextTick = nextTick;
  function nextTick(f) {
    setTimeout(f, 0); // TODO this is relatively slow, could implement with faster version.
  }

  // ### `slice(arr, i, j)`

  da.slice = (a, start, end) => {
    return Array.prototype.slice.call(a, start, end);
  };

  da.test('slice', () => {
    da.assertEquals(da.slice([1,2,3]).length, 3);
    da.assertEquals(da.slice([1,2,3], 1)[1], 3);
    da.assertEquals(da.slice([1,2,3], 1 , 2).length, 1);
  });

  // ### `buf2ascii(buf)`, `ascii2buf(str)` 
  //
  // Convert buffers to/from strings using latin1 encoding with one byte per char.

  da.buf2ascii = (buf) =>
    Array.from(new Uint8Array(buf)).map(i => String.fromCharCode(i)).join('');

  da.test('buffer conversion', () => {
    da.assertEquals(da.buf2ascii(Uint8Array.from([104,105]).buffer), 'hi');
  });

  // ### `equals(a,b)`
  //
  // (deep equal for Object/Array, otherwise `.equals(..)` or direct comparison)
  //
  // TODO handle cyclic structures (via weak-map)
  // TODO handle iterables

  da.equals = (a,b) => {
    if(a === b) {
      return true;
    }

    if(typeof a !== 'object' ||
        typeof b !== 'object' ||
        a === null || b === null) {
      return false;
    }

    if(Array.isArray(a)) {
      if(!Array.isArray(b)) {
        return false;
      }
      if(a.length !== b.length) {
        return false;
      }
      for(var i = 0; i < a.length; ++i) {
        if(!da.equals(a[i], b[i])) {
          return false;
        }
      }
      return true;
    }

    if(a.constructor === Object) {
      if(b.constructor !== Object) {
        return false;
      }
      if(!da.equals(
            Object.keys(a).sort(),
            Object.keys(b).sort())) {
        return false;
      }
      for(var key in a) {
        if(!da.equals(a[key] ,b[key])) {
          return false;
        }
      }
      return true;
    }

    if(typeof a.equals === 'function') {
      return a.equals(b);
    }
    return false;
  };

  da.test('equals', () => {
    da.assert(da.equals({a:[1,2],b:3},{b:3,a:[1,2]}));
    da.assert(!da.equals({a:['1',2],b:3},{b:3,a:[1,2]}));
    da.assertEquals({a:[1,2],b:3},{b:3,a:[1,2]});
  });

  // ## Testing

  var testTimeout = 5000;

  // ### `assert(bool)`

  da.assert = (ok) => ok || throwAssert({type: 'truthy', val: ok});

  // ### `assertEquals(val1, val2)`

  da.assertEquals = (val1, val2) =>
    da.equals(val1, val2) || throwAssert({type: 'equals', vals: [val1, val2]});

  // ### `testSuite(name)`
  //
  // Sets the current test suite name

  da.testSuite = testSuite;
  function testSuite(str) {
    da._currentTestSuite = str;
  }

  // ### `test(name, fn)`
  //
  // Add a test case.
  // For asynchrounous tests, let the function return a promise.
  // Notice that tests may run in parallel.

  da.test = test;
  function test(name, f) {
    if(da._currentTestSuite) {
      name = da._currentTestSuite + ':' + name;
    }
    f.testName = name;
    if(!da._tests) {
      da._tests = [];
    }
    da._tests.push(f);
  }

  // ### `runTests(testSuites)`
  //
  // Execute the tests given a list of test suites.

  da.runTests = (modules) => {
    var ts = da._tests;
    if(modules) {
      if(!Array.isArray(modules)) {
        modules = [modules];
      }
      ts = ts.filter(t => modules.some(m => t.testName.startsWith(m + ':')));
    }
    return Promise
      .all(ts.map(runTest))
      .then(e => {
        console.log('All tests ok:', ts.map(o => JSON.stringify(o.testName)).join(', '));
      });
  };

  // ### Implementation details

  function runTest(t) {
    var err, p;

    try {
      p = Promise.resolve(t());
    } catch(e) {
      p = Promise.reject(e);
    }

    var timeout = new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), testTimeout));
    p = Promise.race([p, timeout]);

    p = p.catch(e => err = e);

    p = p.then(() => {
      if(err) {
        console.log('Test error in "' +
            (t.testName || '') +
            ': ' + err.message);
        if(err.assert) {
          try {
            console.log(JSON.stringify(err.assert));
          } catch(e) {
            console.log(err.assert);
          }
        }

        if(err.stack) {
          console.log(err.stack);
        }

        throw err;
      } else {
        //      console.log('Test ok', t.testName || '');
      }
    });
    return p;
  }

  /*
     test('must error 1', () => da.assert(false));
     test('must error 2', () => new Promise(() => da.assert(false)));
     test('must error 3', () => new Promise((reject, resolve) => {true;}));
     */

  // To get the call stack correct, to be able to report assert position, we throw an `Error` (which includes the stack on many browsers), and enrich it with more information.

  function throwAssert(o) {
    var err = new Error('AssertError');
    err.assert = o;
    throw err;
  }

  da.test('assert',()=>{
    try {
      da.assertEquals(1,2);
    } catch(e) {
      da.assert(e.message === 'AssertError');
      da.assert(typeof e.stack === 'string');
    }
  });

  // ## Module Setup / Main

  function setupModule() {

    // Shims

    if(typeof self === 'undefined') {
      global.self = global;
    }

    if(!self.URL) {
      self.URL = self.webkitURL;
    }

    // Make sure are on https in browser,
    // otherwise crypto etc. wont work.

    if(isBrowser() &&
        self.location.protocol === 'http:' &&
        self.location.hostname !== 'localhost' &&
        !self.location.hostname.startsWith('192.168.')) {
      self.location.href = self.location.href.replace(/http/, 'https');
    }

    // Setup / export module

    da = self.direape || {};

    if(typeof module === 'undefined') {
      self.direape = da;
    } else {
      module.exports = da;
    }

    da.test = test;

    // Define name of testsuite

    testSuite('direape');
  }

  // Initialisation, of the different parts of direape

  Promise
    .resolve(initPid())
    .then(initHandlers)
    .then(initWorker)
    .then(() => {
      for(var i = 0; i < da._waiting.length; ++i) {
        da._waiting[i]();
      }
      da._waiting = false;
    });

  // Main entry

  testSuite('');
  da.ready(() => {
    if(self.DIREAPE_RUN_TESTS) {
      da.runTests();
    }

    if(isNodeJs() && require.main === module) {
      if(process.argv.indexOf('test') !== -1) {
        da.runTests('direape')
          .then(o => {
            if(process.argv.indexOf('server') !== -1) {
              da.startServer();
            } else {
              process.exit(0);
            }
          }).catch(e => process.exit(-1));
      } else {
        if(process.argv.indexOf('server') !== -1) {
          da.startServer();
        }
      }
    }
  });
})();

// ## License
//
// This software is copyrighted solsort.com ApS, and available under GPLv3, as well as proprietary license upon request.
//
// Versions older than 10 years also fall into the public domain.
//