GillesRasigade/pattern

View on GitHub
doc/examples/cqrs/index.js

Summary

Maintainability
A
1 hr
Test Coverage
'use strict';

const co = require('co');

const pattern = require('../../..');

let mapper, commandEmitter, commandHandler, stateEmitter, stateHandler;

// User class definition:
class User extends pattern.Entity {
  *setData(data = {}) {
    this.data = data;
    return this;
  }
  *setFirstname(firstname) {
    this.firstname = firstname;
    return this;
  }
  *setAge(age) {
    this.age = age;
    return this;
  }
}
User.SCHEMA = {
  type: 'object',
  properties: {
    firstname: {
      type: 'string'
    },
    age: {
      type: 'number'
    }
  }
};

// UserCommands definition:
const userCommands = new pattern.Commands({
  *getUser(_uuid, _version = null) {
    const query = { _uuid };
    if (_version !== null) {
      query._version = _version;
    }
    const data = yield mapper
      .connection
      .collection('event')
      .findOne(query, { snapshot: 1 }, {
      sort: { _id: -1 }
    });
    console.log('getUser data:', _uuid, data);
    const user = new User(data.snapshot);
    console.log(21, user._version);
    return user;
  },
  *createUser(data) {
    const user = new User();
    yield user.commands.execute('setData', [data]);
    return user;
  },

  // All these function does nothing... remove them:
  *setFirstname(uuid, firstname) {
    console.log('setFirstname data:', uuid, firstname);
    const user = yield this.getUser(uuid);
    yield user.commands.execute('setFirstname', [firstname]);
    return user;
  },
  *setAge(uuid, age) {
    const user = yield this.getUser(uuid);
    yield user.commands.execute('setAge', [age]);
    // user.data.age = age;
    return user;
  }
})

function* main() {
  // let { mapper, commandEmitter, commandHandler, stateEmitter, stateHandler } =
  yield before();

  const user = {};

  // 0a. Listen command message
  // 0b. Listen consolidation message
  commandHandler.on('command', (command, args, message) => {
    return co(function* () {
      commandHandler.ack(message);
      console.log('onCommand', command, args);

      const user = yield userCommands.execute(command, args);
      const patch = user.getPatch({});
      console.log('After command', user.data, user._snapshot, { patch });

      console.log(33, user.data, user._snapshot);
      const result = yield mapper.connection.collection('event').save(user.buildSnapshot());
      // const result = yield mapper.connection.collection('event').save({
      //   _version: user._version,
      //   _uuid: user.data._uuid,
      //   snapshot: user._snapshot,
      //   patch
      // });

      if (command === 'createUser') {
        yield commandEmitter.emit('command', 'setFirstname', [ user._uuid, 'July']);
      } else if (command === 'setFirstname') {
        yield commandEmitter.emit('command', 'setAge', [ user._uuid, 21]);
      } else {
        const userV1 = yield userCommands.execute('getUser', [user._uuid, 1]);
        const userV2 = yield userCommands.execute('getUser', [user._uuid, 2]);
        const userV3 = yield userCommands.execute('getUser', [user._uuid, 3]);
        console.log(88, userV1.toJSON(), userV2.toJSON(), userV3.toJSON());

        const events = yield mapper.connection.collection('event').find({
          _uuid: user._uuid
        }, {
          sort: { _id: 1 }
        }).toArray();
        console.log(events);

        const bernard = new User();
        for (var event of events) {
          console.log(112, event.events);
          bernard._sourcedEvents = bernard._sourcedEvents.concat(event.events);
        }
        console.log(114, bernard._sourcedEvents, bernard.toJSON());

        yield bernard.replay();

        console.log(119, bernard.toJSON());
      }

      // console.log('After storing events', result);
    }).then(console.log, console.error);
  });
  stateHandler.on('stateChanged', (args, message) => {
    stateHandler.ack(message);
    console.log('onStateChanged', event);
  });

  // 1. Send command message
  commandEmitter.emit('command', 'createUser', [{ firstname: 'Alice' }]);

  // 2. Command handler processing

  // 3. Store event sourcing

  // 4. Consolidate event sourcing

  // 5. Query

  yield cb => setTimeout(cb, 500);

  yield after();

  return;
}





function* before() {
  commandEmitter = new pattern.QueueAmqp({ tx: 'amqp://localhost', name: 'command' });
  yield commandEmitter.connect();

  commandHandler = new pattern.QueueAmqp({ rx: 'amqp://localhost', name: 'command' });
  yield commandHandler.connect();

  stateEmitter = new pattern.QueueAmqp({ tx: 'amqp://localhost', name: 'state' });
  yield stateEmitter.connect();

  stateHandler = new pattern.QueueAmqp({ rx: 'amqp://localhost', name: 'state' });
  yield stateHandler.connect();

  mapper = new pattern.MapperMongoDb();
  yield mapper.connect({
    url: 'mongodb://localhost:27017/test'
  });
  yield mapper.connection.dropDatabase();

  return { mapper, commandEmitter, commandHandler, stateEmitter, stateHandler };
}
function* after() {
  commandEmitter.close();
  commandHandler.close();
  stateEmitter.close();
  stateHandler.close();
  yield mapper.close();
}

return co(main.apply(null, process.argv.slice(2)))
.then(res => {
  process.exit(0);
}, console.error);