NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/worker/transformMultiThreaded.ts

Summary

Maintainability
A
0 mins
Test Coverage
A
93%
import { Worker } from 'node:worker_threads'
import { DeferredPromise, pDefer, _range, AnyObject } from '@naturalcycles/js-lib'
import through2Concurrent = require('through2-concurrent')
import { TransformTyped } from '../../stream.model'
import { WorkerInput, WorkerOutput } from './transformMultiThreaded.model'

export interface TransformMultiThreadedOptions {
  /**
   * Absolute path to a js file with worker code
   */
  workerFile: string

  /**
   * @default 2, to match CircleCI and Github Actions environments
   */
  poolSize?: number

  /**
   * @default to poolSize
   */
  concurrency?: number

  /**
   * @default to Math.max(16, concurrency x 2)
   */
  highWaterMark?: number

  /**
   * Passed to the Worker as `workerData` property (initial data).
   */
  workerData?: AnyObject
}

const workerProxyFilePath = `${__dirname}/workerClassProxy.js`

/**
 * Spawns a pool of Workers (threads).
 * Distributes (using round-robin, equally) all inputs over Workers.
 * Workers emit 1 output for each 1 input.
 * Output of Workers is passed down the stream. Order is RANDOM (since it's a multi-threaded environment).
 */
export function transformMultiThreaded<IN, OUT>(
  opt: TransformMultiThreadedOptions,
): TransformTyped<IN, OUT> {
  const { workerFile, poolSize = 2, workerData } = opt
  const maxConcurrency = opt.concurrency || poolSize
  const highWaterMark = Math.max(16, maxConcurrency)

  console.log({
    poolSize,
    maxConcurrency,
    highWaterMark,
  })

  const workerDonePromises: DeferredPromise<Error | undefined>[] = []
  const messageDonePromises: Record<number, DeferredPromise<OUT>> = {}
  let index = -1 // input chunk index, will start from 0

  const workers = _range(0, poolSize).map(workerIndex => {
    workerDonePromises.push(pDefer())

    const worker = new Worker(workerProxyFilePath, {
      workerData: {
        workerIndex,
        workerFile, // pass it, so workerProxy can require() it
        ...workerData,
      },
    })

    // const {threadId} = worker
    // console.log({threadId})

    worker.on('error', err => {
      console.error(`Worker ${workerIndex} error`, err)
      workerDonePromises[workerIndex]!.reject(err)
    })

    worker.on('exit', _exitCode => {
      // console.log(`Worker ${index} exit: ${exitCode}`)
      workerDonePromises[workerIndex]!.resolve(undefined)
    })

    worker.on('message', (out: WorkerOutput<OUT>) => {
      // console.log(`Message from Worker ${workerIndex}:`, out)
      // console.log(Object.keys(messageDonePromises))
      // tr.push(out.payload)
      if (out.error) {
        messageDonePromises[out.index]!.reject(out.error)
      } else {
        messageDonePromises[out.index]!.resolve(out.payload)
      }
    })

    return worker
  })

  return through2Concurrent.obj(
    {
      maxConcurrency,
      highWaterMark,
      async final(cb) {
        try {
          // Push null (complete) to all sub-streams
          workers.forEach(worker => worker.postMessage(null))

          console.log(`transformMultiThreaded.final is waiting for all chains to be done`)
          await Promise.all(workerDonePromises)
          console.log(`transformMultiThreaded.final all chains done`)

          cb()
        } catch (err) {
          cb(err as Error)
        }
      },
    },
    async function transformMapFn(chunk: IN, _, cb) {
      // Freezing the index, because it may change due to concurrency
      const currentIndex = ++index

      // Create the unresolved promise (to avait)
      messageDonePromises[currentIndex] = pDefer<OUT>()

      const worker = workers[currentIndex % poolSize]! // round-robin
      worker.postMessage({
        index: currentIndex,
        payload: chunk,
      } as WorkerInput)

      try {
        // awaiting for result
        const out = await messageDonePromises[currentIndex]
        // console.log('awaited!')
        // return the result
        cb(null, out)
      } catch (err) {
        // Currently we only support ErrorMode.SUPPRESS
        // Error is logged and output continues
        console.error(err)

        cb() // emit nothing in case of an error
      }

      // clean up
      delete messageDonePromises[currentIndex]
    },
  )
}