qiwi/protopipe

View on GitHub
src/main/ts/processor/Net.ts

Summary

Maintainability
A
2 hrs
Test Coverage
A
96%
/** @module protopipe */

import {
  IEdge,
  IGraph,
  IPointer,
  IVertex,
  Pathfinder,
} from '../graph'
import {
  IId,
  IReference,
  ISpace,
  ISpaceElement,
  RefOperator,
  Space,
} from '../space/'
import {
  Stack,
} from '../stack'
import {
  IAny,
  staticImplements,
  TPredicate,
} from '../types'
import {
  genId,
  getInsideOutPromise,
  promisify,
  TInsideOutPromise,
} from '../util'
import {
  IProcessorStaticOperator,
} from './types'

export type ISpaceReducer = (space: ISpace) => IAny

export type ISpaceReducerMap = {
  [key: string]: ISpaceReducer
}

export type IHandlerParamDeclaration = ISpaceReducer | {
  graph?: ISpaceReducer,
  vertexes?: ISpaceReducerMap,
  edges?: ISpaceReducerMap
}

export type INetProcessorParams = {
  graph: IGraph,
  handler: IHandlerParamDeclaration
}

export type ICxt = {
  queue: number,
  sync: boolean,
  override: boolean,
  iop: TInsideOutPromise,
  before: Function
  after: Function
}

export type IImpactTarget = IVertex | [IVertex, IAny]

export type INormalizedImpactTarget = {
  vertex: IVertex,
  data?: IAny
}

export interface IAnchor extends ISpaceElement {
  type: 'ANCHOR',
  value: IPointer
}

export interface IData extends ISpaceElement {
  type: 'DATA',
  value: IAny
}

export interface IHandler extends ISpaceElement {
  type: 'HANDLER',
  value: Function
}

/**
 * Net processor.
 */
@staticImplements<IProcessorStaticOperator>()
export class NetProcessor {

  [key: string]: IAny
  space: ISpace

  constructor(params: INetProcessorParams) {
    this.space = NetProcessor.parser(params)
  }

  impact(sync: boolean, ...targets: IImpactTarget[]) {
    return NetProcessor.impact(this.space, sync, ...targets)
  }

  private static _normalizeImpactTarget(target: IImpactTarget): INormalizedImpactTarget {
    let vertex: IVertex
    let data: IAny

    if (Array.isArray(target)) {
      [vertex, data] = target

    }
    else {
      vertex = target
    }

    return {
      vertex,
      data,
    }
  }

  static getAnchor(space: ISpace, graph: IGraph, vertex?: IVertex, edge?: IEdge): IAnchor {
    return RefOperator.find(item =>
      item.type === 'ANCHOR'
      && graph === item.value.graph
      && item.value.vertex === vertex
      && item.value.edge === edge
      , space) || RefOperator.create(space, 'ANCHOR', { graph, vertex, edge })
  }

  static getLinkedData(space: ISpace, anchorId: IId) {
    return RefOperator.getRels(space, anchorId).find(({ type }) => type === 'DATA')
  }

  private static _process(cxt: ICxt, space: ISpace, graph: IGraph, vertex: IVertex, fn: Function) {
    cxt.before()

    const targetVetexes: IVertex[] = Pathfinder.getOutVertexes(graph, vertex)
    const processResult = (res: IAny) => {
      const anchor = this.getAnchor(space, graph, vertex)
      const dataRef = this.getLinkedData(space, anchor.id)
      const data = RefOperator.upsert(space, 'DATA', res, dataRef ? ({ id }) => dataRef.id === id : undefined)

      if (!dataRef) {
        RefOperator.link(space, anchor.id, data.id)
      }

      this._impactGroup(space, ...targetVetexes)
      cxt.after()
    }

    if (cxt.sync) {
      processResult(fn())

    }
    else {
      promisify(fn())
        .then(processResult)
        .catch(e => cxt.iop.reject(e))
    }
  }

  private static _impactSingle(space: ISpace, target: IImpactTarget): void {
    const cxt: ICxt = this.getContext(space)
    const graph: IGraph = this.getGraph(space)
    const { vertex, data } = this._normalizeImpactTarget(target)

    if (data !== undefined) {
      this._process(cxt, space, graph, vertex, () => data)
      return
    }
    const anchor = this.getAnchor(space, graph, vertex)
    const dataRef = this.getLinkedData(space, anchor.id)

    if (!cxt.override && dataRef) {
      return
    }

    const handler: ISpaceReducer = this.getHandler(space, vertex)
    const sourceVertexes: IVertex[] = Pathfinder.getInVertexes(graph, vertex)
    const anchors: IAnchor[] = sourceVertexes.map(vertex => this.getAnchor(space, graph, vertex))
    const anchorsIds: IId[] = anchors.map(({ id }) => id)
    const refs: IReference[] = RefOperator.getRefs(space, anchorsIds)
    const sources = refs
      .map(({ value: { to } }) => this.getData(space, undefined, to))
      .filter(elt => elt !== undefined) as IData[]

    if (sources.length === sourceVertexes.length) {
      const graphElt = this.requireElt('GRAPH', space)
      const _space = new Space(anchor, ...anchors, graphElt, ...refs, ...sources)
      this._process(cxt, space, graph, vertex, () => handler(_space))
    }
  }

  private static _impactGroup(space: ISpace, ...targets: IImpactTarget[]): void {
    const cxt: ICxt = this.getContext(space)

    cxt.before()
    targets.map(target => this._impactSingle(space, target))
    cxt.after()
  }

  static getRelsByVertex(space: ISpace, vertex: IVertex): ISpaceElement[] {
    const graph: IGraph = this.getGraph(space)
    const anchor = this.getAnchor(space, graph, vertex)

    return RefOperator.getRels(space, anchor.id)
  }

  static getElt(type: string, space: ISpace, vertex?: IVertex): ISpaceElement | undefined {
    const typePredicate: TPredicate = ({ type: _type }) => type === _type

    if (vertex) {
      return this.getRelsByVertex(space, vertex).find(typePredicate)
    }

    return RefOperator.find(typePredicate, space)
  }

  static requireElt(type: string, space: ISpace, vertex?: IVertex) {
    const elt = this.getElt(type, space, vertex)

    if (!elt) {
      throw new Error(`${type} is required`)
    }

    return elt
  }

  static getHandler(space: ISpace, vertex: IVertex): ISpaceReducer {
    const handler = this.getElt('HANDLER', space, vertex) || this.requireElt('HANDLER', space)

    return handler.value
  }

  static getContext(space: ISpace): ICxt {
    return this.requireElt('CXT', space).value
  }

  static getGraph(space: ISpace): IGraph {
    return this.requireElt('GRAPH', space).value
  }

  static getData(space: ISpace, vertex?: IVertex, id?: IId): ISpaceElement | undefined {
    if (id) {
      return RefOperator.get(space, id, 'DATA')
    }

    return this.getElt('DATA', space, vertex)
  }

  static impact(space: ISpace, sync: boolean, ...targets: IImpactTarget[]) {

    const cxt: ICxt = this.attachContext(space, sync)

    this._impactGroup(space, ...targets)

    return sync
      ? space
      : cxt.iop.promise

  }

  static attachContext(space: ISpace, sync: boolean): ICxt {

    if (this.getElt('CXT', space)) {
      throw new Error('There\'s no place for yet another one execution context')
    }

    const iop: TInsideOutPromise = getInsideOutPromise()
    const cxt = {
      sync,
      queue: 0,
      override: false,
      iop,
      before() {
        this.queue++
      },
      after() {
        this.queue--
        if (this.queue === 0) {
          iop.resolve(space)
        }
      },
    }

    space.value.unshift({
      id: genId(),
      type: 'CXT',
      value: cxt,
    })

    return cxt

  }

  static parser({ graph, handler }: INetProcessorParams): ISpace {
    const space: ISpace = {
      type: 'SPACE',
      value: new Stack<ISpaceElement>(),
    }

    RefOperator.create(space, 'GRAPH', graph)

    this.injectHandlers(space, handler, graph)

    return space
  }

  static injectHandlers(space: ISpace, handler: IHandlerParamDeclaration, graph: IGraph): void {
    const attachHandler = ({ vertex, edge }: { vertex?: IVertex, edge?: IEdge }, handler: ISpaceReducer): void => {
      const anchor = this.getAnchor(space, graph, vertex, edge)
      const _handler: IHandler = RefOperator.create(space, 'HANDLER', handler)

      RefOperator.link(space, anchor.id, _handler.id)
    }
    const createHandlerRefsByMap = (type: 'vertex' | 'edge', map: ISpaceReducerMap = {}) =>
      Object.keys(map).forEach(key => attachHandler({ [type]: key }, map[key]))

    if (typeof handler === 'function') {
      attachHandler({}, handler)
    }
    else {
      if (handler.graph) {
        attachHandler({}, handler.graph)
      }

      createHandlerRefsByMap('edge', handler.edges)
      createHandlerRefsByMap('vertex', handler.vertexes)

    }
  }

}