src/stream/progressLogger.ts
import { inspect, InspectOptions } from 'node:util'
import {
_mb,
_since,
AnyObject,
CommonLogger,
localTimeNow,
SimpleMovingAverage,
UnixTimestampMillisNumber,
} from '@naturalcycles/js-lib'
import { boldWhite, dimGrey, hasColors, white, yellow } from '../colors/colors'
import { SizeStack } from './sizeStack'
import { ReadableMapper } from './stream.model'
export interface ProgressLoggerCfg<T = any> {
/**
* Progress metric
*
* @default `progress`
*/
metric?: string
/**
* Include `heapUsed` in log.
*
* @default false
*/
heapUsed?: boolean
/**
* Include `heapTotal` in log.
*
* @default false
*/
heapTotal?: boolean
/**
* Include `rss` in log.
*
* @default true
*/
rss?: boolean
/**
* Incude Peak RSS in log.
*
* @default true
*/
peakRSS?: boolean
/**
* Include `external` in log.
*
* @default false
*/
external?: boolean
/**
* Include `arrayBuffers` in log.
*
* @default false
*/
arrayBuffers?: boolean
/**
* Log (rss - heapTotal)
* For convenience of debugging "out-of-heap" memory size.
*
* @default false
*/
rssMinusHeap?: boolean
/**
* Log "rows per second"
*
* @default true
*/
logRPS?: boolean
/**
* Set to false to disable logging progress
*
* @default true
*/
logProgress?: boolean
/**
* Log progress event Nth record that is _processed_ (went through mapper).
* Set to 0 to disable logging.
*
* @default 1000
*/
logEvery?: number
logger?: CommonLogger
/**
* Function to return extra properties to the "progress object".
*
* chunk is undefined for "final" stats, otherwise is defined.
*/
extra?: (chunk: T | undefined, index: number) => AnyObject
/**
* Hook that is called when the last item is passed through.
* Passes the final stats as `ProgressLogItem`.
*/
onProgressDone?: (stats: ProgressLogItem) => any
/**
* If specified - will multiply the counter by this number.
* Useful e.g when using `transformChunk({ chunkSize: 500 })`, so
* it'll accurately represent the number of processed entries (not chunks).
*
* Defaults to 1.
*/
chunkSize?: number
/**
* Experimental logging of item (shunk) sizes, when json-stringified.
*
* Defaults to false.
*
* @experimental
*/
logSizes?: boolean
/**
* How many last item sizes to keep in a buffer, to calculate stats (p50, p90, avg, etc).
* Defaults to 100_000.
* Cannot be Infinity.
*/
logSizesBuffer?: number
/**
* Works in addition to `logSizes`. Adds "zipped sizes".
*
* @experimental
*/
logZippedSizes?: boolean
}
export interface ProgressLogItem extends AnyObject {
heapUsed?: number
heapTotal?: number
rss?: number
peakRSS?: number
rssMinusHeap?: number
external?: number
arrayBuffers?: number
rps10?: number
rpsTotal?: number
}
const inspectOpt: InspectOptions = {
colors: hasColors,
breakLength: 300,
}
export class ProgressLogger<T> implements Disposable {
constructor(cfg: ProgressLoggerCfg<T> = {}) {
this.cfg = {
metric: 'progress',
rss: true,
peakRSS: true,
logRPS: true,
logEvery: 1000,
logSizesBuffer: 100_000,
chunkSize: 1,
logger: console,
logProgress: cfg.logProgress !== false && cfg.logEvery !== 0,
...cfg,
}
this.logEvery10 = this.cfg.logEvery * 10
this.start()
this.logStats() // initial
}
cfg!: ProgressLoggerCfg<T> & {
logEvery: number
logSizesBuffer: number
chunkSize: number
metric: string
logger: CommonLogger
}
private started!: UnixTimestampMillisNumber
private lastSecondStarted!: UnixTimestampMillisNumber
private sma!: SimpleMovingAverage
private logEvery10!: number
private processedLastSecond!: number
private progress!: number
private peakRSS!: number
private sizes?: SizeStack
private sizesZipped?: SizeStack
private start(): void {
this.started = Date.now()
this.lastSecondStarted = Date.now()
this.sma = new SimpleMovingAverage(10)
this.processedLastSecond = 0
this.progress = 0
this.peakRSS = 0
this.sizes = this.cfg.logSizes ? new SizeStack('json', this.cfg.logSizesBuffer) : undefined
this.sizesZipped = this.cfg.logZippedSizes
? new SizeStack('json.gz', this.cfg.logSizesBuffer)
: undefined
}
log(chunk?: T): void {
this.progress++
this.processedLastSecond++
if (this.sizes) {
// Check it, cause gzipping might be delayed here..
void SizeStack.countItem(chunk, this.cfg.logger, this.sizes, this.sizesZipped)
}
if (this.cfg.logProgress && this.progress % this.cfg.logEvery === 0) {
this.logStats(chunk, false, this.progress % this.logEvery10 === 0)
}
}
done(): void {
this.logStats(undefined, true)
}
[Symbol.dispose](): void {
this.done()
}
private logStats(chunk?: T, final = false, tenx = false): void {
if (!this.cfg.logProgress) return
const {
metric,
extra,
chunkSize,
heapUsed: logHeapUsed,
heapTotal: logHeapTotal,
rss: logRss,
peakRSS: logPeakRss,
rssMinusHeap,
external,
arrayBuffers,
logRPS,
logger,
} = this.cfg
const mem = process.memoryUsage()
const now = Date.now()
const batchedProgress = this.progress * chunkSize
const lastRPS =
(this.processedLastSecond * chunkSize) / ((now - this.lastSecondStarted) / 1000) || 0
const rpsTotal = Math.round(batchedProgress / ((now - this.started) / 1000)) || 0
this.lastSecondStarted = now
this.processedLastSecond = 0
const rps10 = Math.round(this.sma.pushGetAvg(lastRPS))
if (mem.rss > this.peakRSS) this.peakRSS = mem.rss
const o: ProgressLogItem = {
[final ? `${this.cfg.metric}_final` : this.cfg.metric]: batchedProgress,
}
if (extra) Object.assign(o, extra(chunk, this.progress))
if (logHeapUsed) o.heapUsed = _mb(mem.heapUsed)
if (logHeapTotal) o.heapTotal = _mb(mem.heapTotal)
if (logRss) o.rss = _mb(mem.rss)
if (logPeakRss) o.peakRSS = _mb(this.peakRSS)
if (rssMinusHeap) o.rssMinusHeap = _mb(mem.rss - mem.heapTotal)
if (external) o.external = _mb(mem.external)
if (arrayBuffers) o.arrayBuffers = _mb(mem.arrayBuffers || 0)
if (logRPS) Object.assign(o, { rps10, rpsTotal })
logger.log(inspect(o, inspectOpt))
if (this.sizes?.items.length) {
logger.log(this.sizes.getStats())
if (this.sizesZipped?.items.length) {
logger.log(this.sizesZipped.getStats())
}
}
if (tenx) {
let perHour: number | string =
Math.round((batchedProgress * 1000 * 60 * 60) / (now - this.started)) || 0
if (perHour > 900) {
perHour = Math.round(perHour / 1000) + 'K'
}
logger.log(
`${dimGrey(localTimeNow().toPretty())} ${white(metric)} took ${yellow(
_since(this.started),
)} so far to process ${yellow(batchedProgress)} rows, ~${yellow(perHour)}/hour`,
)
} else if (final) {
logger.log(
`${boldWhite(metric)} took ${yellow(_since(this.started))} to process ${yellow(
batchedProgress,
)} rows with total RPS of ${yellow(rpsTotal)}`,
)
try {
this.cfg.onProgressDone?.(o)
} catch (err) {
logger.error(err)
}
}
}
}
/**
* Create new ProgressLogger.
*/
export function progressLogger<T>(cfg: ProgressLoggerCfg<T> = {}): ProgressLogger<T> {
return new ProgressLogger(cfg)
}
/**
* Limitation: I don't know how to catch the `final` callback to log final stats.
*
* @experimental
*/
export function progressReadableMapper<T>(cfg: ProgressLoggerCfg<T> = {}): ReadableMapper<T, T> {
const progress = new ProgressLogger(cfg)
return chunk => {
progress.log(chunk)
return chunk
}
}