arlac77/node-symatem

View on GitHub
src/api.mjs

Summary

Maintainability
Test Coverage
const net = require('net'),
  msgpack = require('msgpack'),
  path = require('path'),
  fs = require('fs'),
  child_process = require('child_process');

export const queryMode = ['M', 'V', 'I'];

export const symbolByName = {
  Void: 0,
  PosX: 13,
  PosY: 14,
  BlobType: 15,
  Natural: 16,
  Integer: 17,
  Float: 18,
  UTF8: 19,
  BinaryOntologyCodec: 22
};

const symbolToName = [
  'Void',
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  undefined,
  'PosX',
  'PosY',
  'BlobType',
  'Natural',
  'Integer',
  'Float',
  'UTF8',
  undefined,
  undefined,
  'BinaryOntologyCodec'
];

export const queryMask = {
  /*
  MMM: 0,
  VMM: 1,
  IMM: 2,
  MVM: 3,
  VVM: 4,
  IVM: 5,
  MIM: 6,
  VIM: 7,
  IIM: 8,
  MMV: 9,
  VMV: 10,
  IMV: 11,
  MVV: 12,
  VVV: 13,
  IVV: 14,
  MIV: 15,
  VIV: 16,
  IIV: 17,
  MMI: 18,
  VMI: 19,
  IMI: 20,
  MVI: 21,
  VVI: 22,
  IVI: 23,
  MII: 24,
  VII: 25,
  III: 26
  */
};

for (let i = 0; i < 27; ++i) {
  const key =
    queryMode[i % 3] +
    queryMode[Math.floor(i / 3) % 3] +
    queryMode[Math.floor(i / 9) % 3];
  queryMask[key] = i;
}

export function open(options = {}) {
  const port = options.port || 1337;
  const host = options.host || '::1';

  return new Promise((fullfill, reject) => {
    const socket = new net.Socket(),
      msgpackStream = new msgpack.Stream(socket),
      promiseQueue = [];

    const symbolToNameCache = {};

    const symbolCache = new Map();

    const connection = {
      request: (...args) => {
        const packet = msgpack.pack(args);
        return new Promise((resolve, reject) => {
          promiseQueue.push({
            resolve,
            reject
          });
          socket.write(packet);
          //console.log('sending', packet);
        });
      },

      nameToSymbol: name => connection.deserializeBlob(`"${name}"`),

      symbolToName: symbol => {
        const name = symbolToName[symbol] || symbolToNameCache[symbol];
        if (name !== undefined) {
          return name;
        }

        return connection
          .query(false, queryMask.MMV, symbol, symbolByName.BlobType, 0)
          .then(result =>
            connection.readBlob(symbol).then(result => {
              const name = result.toString();
              //console.log(`${symbol} -> ${name}`);
              symbolToNameCache[symbol] = name;
              return name;
            })
          );
      },

      decodeSymbolWithCache: symbol => {
        const d = symbolCache.get(symbol);
        if (d !== undefined) {
          //console.log(`already found: ${symbol}`);
          d.access += 1;
          return Promise.resolve(d.object);
        }

        const p = connection.decodeSymbol(symbol).then(decoded => {
          //console.log(`add ${symbol} -> ${decoded.name}`);
          symbolCache.set(symbol, {
            access: symbolCache.get(symbol).access,
            object: decoded
          });
          return decoded;
        });

        symbolCache.set(symbol, {
          access: 0,
          object: p
        });

        return p;
      },

      decodeSymbol: async symbol => {
        return connection
          .query(false, queryMask.MVV, symbol, 0, 0)
          .then(avs => {
            const valuesAndTypes = [];

            //console.log(`avs: ${avs}`);
            for (let i = 0; i < avs.length; i += 2) {
              const value = avs[i + 1] + 0;
              valuesAndTypes.push(connection.readBlob(value));
              valuesAndTypes.push(
                connection.query(
                  false,
                  queryMask.MMV,
                  value,
                  symbolByName.BlobType,
                  0
                )
              );
            }

            return Promise.all(valuesAndTypes).then(valuesAndTypes => {
              const object = {};
              const promises = [];

              for (let i = 0; i < valuesAndTypes.length; i += 2) {
                let v = valuesAndTypes[i];

                const type = valuesAndTypes[i + 1][0];

                switch (type) {
                  case symbolByName.UTF8:
                    v = v.toString();
                    break;
                  case symbolByName.Natural:
                    v = v.readUInt32LE(0);
                    break;
                  case symbolByName.Integer:
                    v = v.readInt32LE(0);
                    break;
                  default:
                    if (v.length === 0) {
                      v = undefined;
                    } else {
                      console.log(
                        `unknown type '${type}' ${typeof type} ${JSON.stringify(
                          type
                        )} for ${v} : ${avs[i]}`
                      );
                    }
                }
                //console.log(`${avs[i]} ${v} (${type})`);

                if (v !== undefined) {
                  const propertyName = connection.symbolToName(avs[i]);
                  if (propertyName.then) {
                    promises.push(
                      propertyName.then(name => (object[name] = v))
                    );
                  } else {
                    object[propertyName] = v;
                  }
                }
              }
              return Promise.all(promises).then(() => object);
            });
          });
      },

      upload: async (text, packageSymbol = symbolByName.Void) => {
        const buffer = Buffer.from(text);
        const textSymbol = await connection.createSymbol();

        connection.setBlobSize(textSymbol, buffer.length * 8).then(() =>
          connection
            .writeBlob(textSymbol, 0, buffer.length * 8, buffer)
            .then(() =>
              connection
                .link(textSymbol, symbolByName.BlobType, symbolByName.UTF8)
                .then(() =>
                  connection
                    .deserializeBlob(textSymbol, packageSymbol)
                    .then(data => {
                      connection.releaseSymbol(textSymbol);
                      return Array.isArray(data)
                        ? data
                        : connection
                            .decodeSymbol(data)
                            .then(r => Promise.reject(r));
                    })
                )
            )
        );
      }
    };

    [
      'createSymbol',
      'releaseSymbol',
      'getBlobSize',
      'setBlobSize',
      'decreaseBlobSize',
      'increaseBlobSize',
      'readBlob',
      'writeBlob',
      'deserializeBlob',
      'query',
      'link',
      'unlink'
    ].forEach(name => {
      connection[name] = (...args) => connection.request(name, ...args);
    });

    //socket.on('data', data => console.log('received', data));
    msgpackStream.addListener('msg', data =>
      promiseQueue.shift().resolve(data)
    );

    socket.on('error', error => {
      socket.destroy();
      reject(error);
    });

    socket.on('close', () => {
      promiseQueue.forEach(p => p.reject());
      promiseQueue.length = 0;
      socket.destroy();
    });

    if (options.store === undefined) {
      socket.connect(port, host, error => {
        if (error) {
          reject(error);
        } else {
          fullfill(connection);
        }
      });
    } else {
      const executable = path.join(__dirname, '..', 'SymatemMP');
      const symatem = child_process.spawn(
        executable,
        ['--port', port, '--file', options.store],
        {
          shell: false,
          detached: false,
          stdio: ['ignore', 'pipe', process.stderr]
        }
      );

      /*
            symatem.stdout.on('data', data => {
              console.log(`stdout: ${data}`);
              if (data.match(/Listening/)) {
                socket.connect(port, host, error => {
                  if (error) {
                    reject(error);
                  } else {
                    fullfill(connection);
                  }
                });
              }
            });
      */

      //process.stderr.on('data', data => console.error(data));
      symatem.on('error', err =>
        reject(`Failed to start ${executable}: ${err}`)
      );

      connection.close = () => {
        symatem.kill();
        return Promise.resolve();
      };

      setTimeout(
        () =>
          socket.connect(port, host, error => {
            if (error) {
              reject(error);
            } else {
              fullfill(connection);
            }
          }),
        200
      );
    }
  });
}