NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/worker/workerClassProxy.js

Summary

Maintainability
A
0 mins
Test Coverage
const started = Date.now()
const { workerData, parentPort } = require('worker_threads')
const { inspect } = require('util')
const { workerFile, workerIndex, logEvery = 1000, metric = 'worker' } = workerData || {}

if (!workerFile) {
  throw new Error('workerData.workerFile is required!')
}

// console.log(`worker#${workerIndex} created`)

try {
  // require('esbuild-register') // alternative
  require('ts-node/register/transpile-only')
  require('tsconfig-paths/register')
} catch {} // require if exists

const { WorkerClass } = require(workerFile)
const worker = new WorkerClass(workerData)

console.log(`${metric}#${workerIndex} loaded in ${Date.now() - started} ms`)

let errors = 0
let processed = 0

parentPort.on('message', async msg => {
  if (msg === null) {
    // console.log(`EXIT (null) received by ${index}, exiting`)
    parentPort.close()

    logStats(true)

    return
  }

  // console.log(`message received by worker ${index}: `, msg)

  try {
    const out = await worker.process(msg.payload, msg.index)

    parentPort.postMessage({
      index: msg.index,
      payload: out,
    })

    processed++

    if (processed % logEvery === 0) logStats()
  } catch (err) {
    parentPort.postMessage({
      index: msg.index,
      error: err,
    })

    errors++
    console.log(`${metric}#${workerIndex} errors: ${errors}`)
  }
})

const inspectOpt = {
  colors: true,
  breakLength: 120,
}

function logStats(final) {
  const { rss, heapUsed, heapTotal, external } = process.memoryUsage()

  console.log(
    inspect(
      {
        [`${metric}${workerIndex}`]: processed,
        errors,
        heapUsed: mb(heapUsed),
        heapTotal: mb(heapTotal),
        rss: mb(rss),
        external: mb(external),
        ...(final ? { final: true } : {}),
      },
      inspectOpt,
    ),
  )
}

function mb(b) {
  return Math.round(b / (1024 * 1024))
}