Kronos-Integration/kronos-step-aggregate

View on GitHub
src/step-aggregate.js

Summary

Maintainability
A
3 hrs
Test Coverage
import { SendEndpoint } from 'kronos-endpoint';
import { Step } from 'kronos-step';
import { createAttributes } from 'model-attributes';

/**
 * Aggregates messages from several endpoints.
 * The incoming request is dispatched to all out endpoints and the results
 * are combined into one result.
 */
export class AggregateStep extends Step {
  /**
   * @return {string} 'kronos-aggregate'
   **/
  static get name() {
    return 'kronos-aggregate';
  }

  static get description() {
    return 'aggregates requests from several endpoints';
  }

  static get configurationAttributes() {
    return createAttributes({
      aggregate: {
        type: 'string',
        description:
          'flat means all results will be collected as attributes in the resulting object',
        default: 'flat'
      }
    });
  }

  constructor(...args) {
    super(...args);

    const outEndpoints = this.outEndpoints.filter(e => !e.isDefault);

    this.inEndpoints.filter(e => !e.isDefault).forEach(
      ie =>
        (ie.receive = async request => {
          const responses = await Promise.all(
            outEndpoints.map(o => o.receive(request))
          );
          if (this.aggregate === 'flat') {
            return responses.reduce((a, c) => Object.assign(a, c), {});
          }

          const result = {};
          for (let i = 0; i < outEndpoints.length; i++) {
            result[outEndpoints[i].name] = responses[i];
          }
          return result;
        })
    );
  }

  endpointOptions(name, def) {
    const options = super.endpointOptions(name, def);
    const step = this;

    function outOpposite(f) {
      for (const en in step.endpoints) {
        const e = step.endpoints[en];
        if (e.isOut && e.opposite && !e.isDefault) {
          f(e.opposite);
        }
      }
    }

    if (def.opposite) {
      if (def.in) {
        options.opposite = new SendEndpoint(name, this, {
          hasBeenOpened() {
            step.info({
              endpoint: this.identifier,
              state: 'open'
            });

            if (this.aggregate === 'flat') {
              outOpposite(e => (e.receive = this.receive));
            } else {
              outOpposite(
                e =>
                  (e.receive = request =>
                    this.receive({
                      [e.name]: request
                    }))
              );
            }
          },
          willBeClosed() {
            step.info({
              endpoint: this.identifier,
              state: 'close'
            });

            outOpposite(e => (e.receive = undefined));
          }
        });
      } else {
        options.createOpposite = true;
      }
    }
    return options;
  }
}

export async function registerWithManager(manager) {
  return manager.registerStep(AggregateStep);
}