cyclejs/cycle-core

View on GitHub
devtool/src/graphSerializer.ts

Summary

Maintainability
B
6 hrs
Test Coverage
/* tslint:disable:max-file-line-count */
import xs, {Stream, Listener} from 'xstream';
import {DevToolEnabledSource} from '@cycle/run';
import debounce from 'xstream/extra/debounce';
import * as dagre from 'dagre';
import * as CircularJSON from 'circular-json';
import {ZapSpeed} from './panel/model';
import timeSpread from './utils/timeSpread';
import {SessionSettings} from './launcher';

interface InternalProducer {
  type?: string;
}

export interface StreamGraphNode {
  id: string;
  type: 'source' | 'stream' | 'sink' | 'operator';
  label?: string;
  stream: Stream<any>;
  width: number;
  height: number;
  x?: number;
  y?: number;
}

export interface StreamGraphEdge {
  label?: string;
  points?: Array<{x: number; y: number}>;
}

export interface Zap {
  id: string;
  type: 'next' | 'error' | 'complete';
  value?: any;
}

type Size = [number, number];

const OPERATOR_NODE_SIZE: Size = [23, 10];
const SOURCE_NODE_SIZE: Size = [23, 23];
const COMMON_NODE_SIZE: Size = [23, 23];
const SINK_NODE_SIZE: Size = [40, 30];

function zapSpeedToMilliseconds(zapSpeed: ZapSpeed): number {
  switch (zapSpeed) {
    case 'slow':
      return 1100;
    case 'normal':
      return 80;
    case 'fast':
      return 16;
  }
}

class IdTable {
  private mutableIncrementingId: number;
  public map: Map<Object, number>;

  constructor() {
    this.mutableIncrementingId = 0;
    this.map = new Map<Stream<any>, number>();
  }

  public getId(thing: Object): string {
    if (!this.map.has(thing)) {
      const id = this.mutableIncrementingId;
      this.map.set(thing, id);
      this.mutableIncrementingId += 1;
      return String(id);
    } else {
      return String(this.map.get(thing));
    }
  }
}

function makeSureNodeIsRegistered(
  graph: dagre.graphlib.Graph,
  idTable: IdTable,
  stream: Stream<any>,
): void {
  if (!graph.node(idTable.getId(stream))) {
    let node: StreamGraphNode;
    if (stream['_isCycleSource']) {
      node = {
        id: idTable.getId(stream),
        type: 'source',
        label: (stream as Stream<any> & DevToolEnabledSource)._isCycleSource,
        stream: stream,
        width: SOURCE_NODE_SIZE[0],
        height: SOURCE_NODE_SIZE[1],
      };
    } else {
      node = {
        id: idTable.getId(stream),
        type: 'stream',
        stream: stream,
        width: COMMON_NODE_SIZE[0],
        height: COMMON_NODE_SIZE[1],
      };
    }
    graph.setNode(idTable.getId(stream), node);
  }
}

function visitOperator(
  graph: dagre.graphlib.Graph,
  idTable: IdTable,
  operator: InternalProducer,
): void {
  const id = idTable.getId(operator);
  if (!graph.node(id)) {
    graph.setNode(id, {
      id,
      type: 'operator',
      label: operator.type,
      width: OPERATOR_NODE_SIZE[0],
      height: OPERATOR_NODE_SIZE[1],
    });
  }
}

function visitEdge(
  graph: dagre.graphlib.Graph,
  idTable: IdTable,
  inStream: Stream<any>,
  operator: InternalProducer,
  outStream: Stream<any>,
) {
  makeSureNodeIsRegistered(graph, idTable, inStream);
  makeSureNodeIsRegistered(graph, idTable, outStream);
  graph.setEdge(idTable.getId(inStream), idTable.getId(operator), {});
  graph.setEdge(idTable.getId(operator), idTable.getId(outStream), {});
  if (!inStream['_isCycleSource']) {
    traverse(graph, idTable, inStream);
  }
}

function traverse(
  graph: dagre.graphlib.Graph,
  idTable: IdTable,
  outStream: Stream<any>,
): void {
  if (outStream._prod && outStream._prod['ins']) {
    const inStream: Stream<any> = outStream._prod['ins'];
    visitOperator(graph, idTable, outStream._prod);
    visitEdge(graph, idTable, inStream, outStream._prod, outStream);
  } else if (outStream._prod && outStream._prod['insArr']) {
    const insArr: Array<Stream<any>> = outStream._prod['insArr'];
    visitOperator(graph, idTable, outStream._prod);
    insArr.forEach(inStream => {
      visitEdge(graph, idTable, inStream, outStream._prod, outStream);
    });
  } else if (outStream._prod) {
    visitOperator(graph, idTable, outStream._prod);
    makeSureNodeIsRegistered(graph, idTable, outStream);
    graph.setEdge(idTable.getId(outStream._prod), idTable.getId(outStream), {});
  }
}

function buildGraph(sinks: Object): dagre.graphlib.Graph {
  const idTable = new IdTable();
  const graph = new dagre.graphlib.Graph();
  graph.setGraph({nodesep: 60, ranksep: 20});
  for (const key in sinks) {
    if (sinks.hasOwnProperty(key)) {
      const node: StreamGraphNode = {
        id: idTable.getId(sinks[key]),
        label: key,
        type: 'sink',
        stream: sinks[key],
        width: SINK_NODE_SIZE[0],
        height: SINK_NODE_SIZE[1],
      };
      graph.setNode(idTable.getId(sinks[key]), node);
      traverse(graph, idTable, sinks[key]);
    }
  }
  dagre.layout(graph);
  return graph;
}

interface Diagram {
  graph: dagre.graphlib.Graph;
  zaps$: Stream<Array<Zap>>;
}

interface ZapRecord {
  id: string;
  stream: Stream<any>;
  depth: number;
}

class ZapRegistry {
  private _presenceSet: Set<string>;
  private _records: Array<ZapRecord>;

  constructor() {
    this._presenceSet = new Set<string>();
    this._records = [];
  }

  public has(id: string): boolean {
    return this._presenceSet.has(id);
  }

  public register(id: string, stream: Stream<any>, depth: number): void {
    this._presenceSet.add(id);
    this._records.push({id, stream, depth});
  }

  get records() {
    return this._records;
  }
}

function setupZapping(
  [graph, zapSpeed]: [dagre.graphlib.Graph, ZapSpeed],
): Diagram {
  const registry: ZapRegistry = new ZapRegistry();
  const sourceNodes: Array<string> = graph['sources']();
  sourceNodes.forEach(id => {
    zapVisit(id, 0, graph, registry);
  });

  const rawZap$ = xs.create<Zap>({
    start(listener: Listener<Zap>) {
      const n = registry.records.length;
      for (let i = 0; i < n; i++) {
        const record = registry.records[i];
        const id = record.id;
        record.stream.setDebugListener({
          next: value => listener.next({id, type: 'next', value} as Zap),
          error: err => listener.next({id, type: 'error', value: err} as Zap),
          complete: () => listener.next({id, type: 'complete'} as Zap),
        });
      }
    },
    stop() {},
  });

  const actualZaps$ = rawZap$.compose(
    timeSpread(zapSpeedToMilliseconds(zapSpeed)),
  );

  const stopZaps$: Stream<Array<any>> = actualZaps$
    .mapTo([])
    .compose(debounce(200))
    .startWith([]);

  const zaps$ = xs.merge(actualZaps$, stopZaps$);

  return {graph, zaps$};
}

function zapVisit(
  nodeId: string,
  depth: number,
  graph: dagre.graphlib.Graph,
  registry: ZapRegistry,
) {
  if (registry.has(nodeId)) {
    return;
  } else {
    const node: StreamGraphNode = graph.node(nodeId);
    if (node.type !== 'operator') {
      registry.register(nodeId, node.stream, depth);
    }
    const successors: Array<string> = graph['successors'](nodeId);
    successors.forEach(id => {
      zapVisit(id, depth + 1, graph, registry);
    });
  }
}

function makeObjectifyGraph(id$: Stream<string>) {
  return function objectifyGraph(diagram$: Stream<Diagram>): Stream<Object> {
    return xs
      .combine(diagram$, id$)
      .map(([{graph, zaps$}, id]) => {
        const object = dagre.graphlib['json'].write(graph);
        const n = object.nodes.length;
        for (let i = 0; i < n; i++) {
          delete object.nodes[i].stream;
        }
        return zaps$.map(zaps => {
          object.zaps = zaps;
          object.id = id;
          return object;
        });
      })
      .flatten();
  };
}

function sinksAreXStream(sinks: Object | null): boolean {
  if (sinks === null) {
    return false;
  }
  for (const key in sinks) {
    if (sinks.hasOwnProperty(key)) {
      if (sinks[key] && typeof sinks[key].setDebugListener !== 'function') {
        return false;
      }
    }
  }
  return true;
}

interface GraphSerializerSources {
  id: Stream<string>;
  DebugSinks: Stream<Object | null>;
  FromPanel: Stream<string>;
  Settings: Stream<SessionSettings>;
}

interface GraphSerializerSinks {
  graph: Stream<string>;
}

function GraphSerializer(
  sources: GraphSerializerSources,
): GraphSerializerSinks {
  const zapSpeed$ = sources.Settings
    .map(settings =>
      (sources.FromPanel as Stream<ZapSpeed>).startWith(settings.zapSpeed),
    )
    .flatten();

  const graph$ = sources.DebugSinks.filter(sinksAreXStream).map(buildGraph);

  const serializedGraph$ = xs
    .combine(graph$, zapSpeed$)
    .map(setupZapping)
    .compose(makeObjectifyGraph(sources.id))
    .map(object => CircularJSON.stringify(object));

  const invalid$ = sources.DebugSinks
    .filter(x => !sinksAreXStream(x))
    .mapTo('');

  return {
    graph: xs.merge(serializedGraph$, invalid$),
  };
}

const panelMessage$ = xs.create<string>({
  start(listener: Listener<string>) {
    window['receivePanelMessage'] = function receivePanelMessage(msg: string) {
      listener.next(msg);
    };
  },
  stop() {},
});

let started = false;

function startGraphSerializer(appSinks: Object | null) {
  if (started) {
    return;
  }
  const serializerSources: GraphSerializerSources = {
    id: xs.of(`graph-${Math.round(Math.random() * 1000000000)}`),
    DebugSinks: xs.of(appSinks),
    FromPanel: panelMessage$,
    Settings: xs.of<SessionSettings>(window['CyclejsDevToolSettings']),
  };
  const serializerSinks = GraphSerializer(serializerSources);

  serializerSinks.graph.addListener({
    next: graph => {
      // console.log('GRAPH SERIALIZER send message to CONTENT SCRIPT: ' + graph);
      // Send message to the CONTENT SCRIPT
      const event = new CustomEvent('CyclejsDevToolEvent', {detail: graph});
      document.dispatchEvent(event);
    },
    error: (err: any) => {
      console.error('Cycle.js DevTool (graph serializer):\n' + err);
      console.error(err.stack);
    },
    complete: () => {},
  });
  started = true;
}

window['CyclejsDevTool_startGraphSerializer'] = startGraphSerializer;

const intervalID = setInterval(function() {
  if (window['Cyclejs'] && window['Cyclejs'].sinks) {
    clearInterval(intervalID);
    startGraphSerializer(window['Cyclejs'].sinks);
  } else {
    clearInterval(intervalID);
    startGraphSerializer(null);
  }
}, 50);
/* tslint:enable:max-file-line-count */