rjrodger/ordu

View on GitHub
ordu.ts

Summary

Maintainability
C
7 hrs
Test Coverage
/* Copyright (c) 2016-2021 Richard Rodger and other contributors, MIT License */
/* $lab:coverage:off$ */
'use strict'
/* $lab:coverage:on$ */

import { EventEmitter } from 'events'

import * as Hoek from '@hapi/hoek'

import Nua from 'nua'
import StrictEventEmitter from 'strict-event-emitter-types'

export type { TaskDef, TaskSpec }

export { Ordu, Task, LegacyOrdu }

interface Events {
  'task-result': TaskResult
  'task-end': { result: TaskResult; operate: Operate; data: any }
}

type OrduEmitter = StrictEventEmitter<EventEmitter, Events>

interface OrduIF {
  add(td: TaskDef): OrduIF
  add(td: TaskDef[]): OrduIF

  add(te: TaskExec): OrduIF
  add(te: TaskExec, td: TaskDef): OrduIF
  add(te: TaskExec[]): OrduIF

  tasks(): Task[]

  task: {
    [name: string]: Task
  }

  operator(name: string, opr: Operator): void
  operator(opr: Operator): void

  operators(): object

  execSync(this: Ordu, ctx: any, data: any, opts: any): ExecResult
  exec(ctx: any, data: any, opts: any): Promise<ExecResult>
}

interface TaskDef {
  id?: string
  name?: string
  before?: string
  after?: string
  exec?: TaskExec
  // from?: object
  if?: { [k: string]: any }
  active?: boolean
  meta?: any
}

type TaskExec = (s: TaskSpec) => any

interface TaskSpec {
  ctx: any
  data: any
  task: Task
}

class Task {
  static count: number = 0

  runid: string
  name: string
  before?: string
  after?: string
  exec: (s: TaskSpec) => TaskResult
  if?: { [k: string]: any }
  active?: boolean
  meta: {
    when: number
    from: object
  }

  constructor(taskdef: TaskDef) {
    this.runid =
      null == taskdef.id ? ('' + Math.random()).substring(2) : taskdef.id
    this.name = taskdef.name || 'task' + Task.count++
    this.before = taskdef.before
    this.after = taskdef.after
    this.exec = taskdef.exec || ((_: TaskSpec) => {})
    this.if = taskdef.if || void 0
    this.active = null == taskdef.active ? true : taskdef.active
    this.meta = Object.assign(taskdef.meta || {}, {
      when: Date.now(),
      from: taskdef.meta?.from,
    })
  }
}

// Use the constructor to normalize task result
class TaskResult {
  op: string
  out?: object
  err?: Error
  why?: string
  task: Task
  name: string
  start: number
  end: number
  runid: string
  index: number
  total: number
  async: boolean

  constructor(task: Task, taskI: number, total: number, runid: string) {
    this.op = 'not-defined'
    this.task = task
    this.name = task.name
    this.start = Date.now()
    this.end = Number.MAX_SAFE_INTEGER
    this.index = taskI
    this.total = total
    this.async = false
    this.runid = runid
  }

  update(raw: any) {
    raw = null == raw ? {} : raw

    this.out = null == raw.out ? {} : raw.out
    this.err = raw instanceof Error ? raw : raw.err

    this.op =
      null != this.err ? 'stop' : 'string' === typeof raw.op ? raw.op : 'next'

    this.why = raw.why || ''
  }
}

type Operate = {
  stop: boolean
  err?: Error
  async?: boolean
}

type ExecResult = {
  tasklog: any[]
  task?: Task
  taskcount: number
  tasktotal: number
  start: number
  end: number
  err?: Error
  data: object
}

type Operator = (r: TaskResult, ctx: any, data: object) => Operate

class Ordu extends (EventEmitter as { new (): OrduEmitter }) implements OrduIF {
  private _opts: any

  private _tasks: Task[]

  private _operator_map: {
    [op: string]: Operator
  }

  task: { [name: string]: Task }

  constructor(opts?: any) {
    super()

    this.task = {}

    this._opts = {
      debug: false,
      ...opts,
    }

    this._tasks = []

    this._operator_map = {
      next: () => ({ stop: false }),

      skip: () => ({ stop: false }),

      stop: (tr, _, data) => {
        Nua(data, tr.out, { preserve: true })
        return { stop: true, err: tr.err }
      },

      merge: (tr, _, data) => {
        Nua(data, tr.out, { preserve: true })
        return { stop: false }
      },
    }
  }

  operator(first: string | Operator, opr?: Operator) {
    let name: string = 'string' === typeof first ? first : first.name
    this._operator_map[name] = opr || (first as Operator)
  }

  operators() {
    return this._operator_map
  }

  add(first: any, second?: any): Ordu {
    let callpoint: string[] | undefined
    if (this._opts.debug) {
      callpoint = make_callpoint(new Error())
    }

    if ('function' == typeof first) {
      second = second || {}
      let t = second
      t.exec = first
      t.name = first.name ? first.name : t.name
      this._add_task(t, callpoint)
    } else if (Array.isArray(first)) {
      for (var i = 0; i < first.length; i++) {
        let entry: TaskDef = first[i]
        if ('function' === typeof first[i]) {
          entry = { name: (first[i] as TaskExec).name, exec: first[i] }
        }
        this._add_task(entry, callpoint)
      }
    } else {
      this._add_task(first, callpoint)
    }

    return this
  }

  private _add_task(td: TaskDef, callpoint: string[] | undefined): void {
    if (callpoint) {
      td.meta = td.meta || {}
      td.meta.from = Object.assign(td.meta.from || {}, {
        callpoint$: callpoint,
      })
    }

    let t = new Task(td)

    let tI = 0
    for (; tI < this._tasks.length; tI++) {
      if (null != t.before && this._tasks[tI].name === t.before) {
        break
      } else if (null != t.after && this._tasks[tI].name === t.after) {
        tI++
        break
      }
    }

    this._tasks.splice(tI, 0, t)

    this.task[t.name] = t
  }

  execSync(this: Ordu, ctx: any, data: any, opts: any): ExecResult {
    return this._execImpl(ctx, data, opts) as unknown as ExecResult
  }

  async exec(ctx: any, data: any, opts: any): Promise<ExecResult> {
    return new Promise((resolve) => {
      this._execImpl(ctx, data, opts, resolve)
    })
  }

  _execImpl(
    this: Ordu,
    ctx: any,
    data: any,
    opts: any,
    resolve?: (execres: ExecResult) => void
  ): ExecResult | Promise<ExecResult> {
    const self = this

    opts = null == opts ? {} : opts
    let runid = opts.runid || (Math.random() + '').substring(2)
    let start = Date.now()
    let tasks: Task[] = [...self._tasks]

    let spec = {
      ctx: ctx || {},
      data: data || {},
    }

    let tasklog: any[] = []

    let taskcount = 0

    let last_taskI = 0
    let last_operate: any = undefined

    return next(0) as ExecResult

    function next(taskI: number): ExecResult | void {
      if (taskI >= tasks.length) {
        let execres = finish()
        return resolve ? resolve(execres) : execres
      }
      last_taskI = taskI

      let task = tasks[taskI]
      let execres: any
      let taskout: any
      let result = new TaskResult(task, taskI, tasks.length, runid)

      if (task.active && self._task_if(task, spec.data)) {
        try {
          taskcount++
          let taskspec = Object.assign({ task: task }, spec)
          execres = task.exec(taskspec)

          if (execres instanceof Promise) {
            result.async = true
            execres
              .then((out) => (taskout = out))
              .catch((err) => (taskout = err))
              .finally(() => handle_result(taskI, task, taskout, result))
          } else {
            taskout = execres
          }
        } catch (task_ex) {
          taskout = task_ex
        }
      } else {
        taskout = { op: 'skip' }
      }

      if (!result.async) {
        return handle_result(taskI, task, taskout, result)
      }
    }

    function handle_result(
      taskI: number,
      task: Task,
      taskout: any,
      result: TaskResult
    ): ExecResult | void {
      result.end = Date.now()
      result.update(taskout)
      self.emit('task-result', result)

      let operate: Operate = {
        stop: false,
        err: undefined,
        async: false,
      }

      try {
        let opres = self._operate(result, spec.ctx, spec.data)
        if (opres instanceof Promise) {
          operate.async = true
          opres
            .then((out) => {
              Object.assign(operate, out)
            })
            .catch((err) => {
              operate.stop = true
              operate.err = err
            })
            .finally(() => {
              handle_operate(taskI, task, result, operate)
            })
        } else {
          operate = opres as Operate
          operate.async = false
        }
      } catch (operate_ex) {
        operate.stop = true
        operate.err = operate_ex as Error
      }

      if (!operate.async) {
        return handle_operate(taskI, task, result, operate)
      }
    }

    function handle_operate(
      taskI: number,
      task: Task,
      result: TaskResult,
      operate: Operate
    ): ExecResult | void {
      last_operate = operate

      let entry = {
        name: task.name,
        op: result.op,
        task,
        result,
        operate,
        data: self._opts.debug ? JSON.parse(JSON.stringify(spec.data)) : void 0,
      }
      tasklog.push(entry)
      self.emit('task-end', entry)

      if (!operate.stop) {
        ++taskI
      } else {
        taskI = tasks.length
      }

      return next(taskI)
    }

    function finish(): ExecResult {
      let err = last_operate ? last_operate.err : null

      let execres: ExecResult = {
        tasklog: tasklog,
        task: err ? tasks[last_taskI] : void 0,
        taskcount: taskcount,
        tasktotal: tasks.length,
        start: start,
        end: Date.now(),
        err: err,
        data: spec.data,
      }

      if (opts.done) {
        opts.done(execres)
      }

      return execres
    }
  }

  tasks() {
    return [...this._tasks]
  }

  private _operate(r: TaskResult, ctx: any, data: object): Operate {
    if (r.err) {
      return {
        stop: true,
        err: r.err,
        async: false,
      }
    }

    let operator = this._operator_map[r.op]

    if (operator) {
      return operator(r, ctx, data)
    } else {
      return {
        stop: true,
        err: new Error('Unknown operation: ' + r.op),
        async: false,
      }
    }
  }

  private _task_if(task: Task, data: object): boolean {
    if (task.if) {
      let task_if: { [k: string]: any } = task.if
      return Object.keys(task_if).reduce((proceed: boolean, k: string) => {
        let part: any = Hoek.reach(data, k)

        return (
          proceed &&
          Hoek.contain({ $: part }, { $: task_if[k] }, { deep: true })
        )
      }, true)
    } else {
      return true
    }
  }
}

/* $lab:coverage:off$ */
function make_callpoint(err: Error) {
  return null == err
    ? []
    : (err.stack || '')
        .split(/\n/)
        .slice(4)
        .map((line) => line.substring(4))
}
/* $lab:coverage:on$ */

function LegacyOrdu(opts?: any): any {
  var orduI = -1

  var self: any = {}
  ++orduI

  opts = opts || {}
  opts.name = opts.name || 'ordu' + orduI

  self.add = api_add
  self.process = api_process
  self.tasknames = api_tasknames
  self.taskdetails = api_taskdetails
  self.toString = api_toString

  var tasks: any[] = []

  function api_add(spec: any, task: any) {
    task = task || spec

    if (!task.name) {
      Object.defineProperty(task, 'name', {
        value: opts.name + '_task' + tasks.length,
      })
    }

    task.tags = spec.tags || []

    tasks.push(task)

    return self
  }

  // Valid calls:
  //   * process(spec, ctxt, data)
  //   * process(ctxt, data)
  //   * process(data)
  //   * process()
  function api_process() {
    var i = arguments.length
    var data = 0 < i && arguments[--i]
    var ctxt = 0 < i && arguments[--i]
    var spec = 0 < i && arguments[--i]

    data = data || {}
    ctxt = ctxt || {}
    spec = spec || {}

    spec.tags = spec.tags || []

    for (var tI = 0; tI < tasks.length; ++tI) {
      var task = tasks[tI]

      if (0 < spec.tags.length && !contains(task.tags, spec.tags)) {
        continue
      }

      var index$ = tI
      var taskname$ = task.name

      ctxt.index$ = index$
      ctxt.taskname$ = taskname$

      var res = task(ctxt, data)

      if (res) {
        res.index$ = index$
        res.taskname$ = taskname$
        res.ctxt$ = ctxt
        res.data$ = data
        return res
      }
    }

    return null
  }

  function api_tasknames() {
    return tasks.map(function (v) {
      return v.name
    })
  }

  function api_taskdetails() {
    return tasks.map(function (v) {
      return v.name + ':{tags:' + v.tags + '}'
    })
  }

  function api_toString() {
    return opts.name + ':[' + self.tasknames() + ']'
  }

  return self
}

function contains(all: any, some: any) {
  for (var i = 0; i < some.length; ++i) {
    if (-1 === all.indexOf(some[i])) {
      return false
    }
  }

  return true
}