andywer/threads.js

View on GitHub
src/master/pool.ts

Summary

Maintainability
B
5 hrs
Test Coverage
import DebugLogger from "debug"
import { multicast, Observable, Subject } from "observable-fns"
import { allSettled } from "../ponyfills"
import { defaultPoolSize } from "./implementation"
import {
  PoolEvent,
  PoolEventType,
  QueuedTask,
  TaskRunFunction,
  WorkerDescriptor
} from "./pool-types"
import { Thread } from "./thread"

export { PoolEvent, PoolEventType, QueuedTask, Thread }

// tslint:disable-next-line no-namespace
export declare namespace Pool {
  type Event<ThreadType extends Thread = any> = PoolEvent<ThreadType>
  type EventType = PoolEventType
}

let nextPoolID = 1

function createArray(size: number): number[] {
  const array: number[] = []
  for (let index = 0; index < size; index++) {
    array.push(index)
  }
  return array
}

function delay(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms))
}

function flatMap<In, Out>(array: In[], mapper: ((element: In) => Out[])): Out[] {
  return array.reduce<Out[]>(
    (flattened, element) => [...flattened, ...mapper(element)],
    []
  )
}

function slugify(text: string) {
  return text.replace(/\W/g, " ").trim().replace(/\s+/g, "-")
}

function spawnWorkers<ThreadType extends Thread>(
  spawnWorker: () => Promise<ThreadType>,
  count: number
): Array<WorkerDescriptor<ThreadType>> {
  return createArray(count).map((): WorkerDescriptor<ThreadType> => ({
    init: spawnWorker(),
    runningTasks: []
  }))
}

/**
 * Thread pool managing a set of worker threads.
 * Use it to queue tasks that are run on those threads with limited
 * concurrency.
 */
export interface Pool<ThreadType extends Thread> {
  /**
   * Returns a promise that resolves once the task queue is emptied.
   * Promise will be rejected if any task fails.
   *
   * @param allowResolvingImmediately Set to `true` to resolve immediately if task queue is currently empty.
   */
  completed(allowResolvingImmediately?: boolean): Promise<any>

  /**
   * Returns a promise that resolves once the task queue is emptied.
   * Failing tasks will not cause the promise to be rejected.
   *
   * @param allowResolvingImmediately Set to `true` to resolve immediately if task queue is currently empty.
   */
  settled(allowResolvingImmediately?: boolean): Promise<Error[]>

  /**
   * Returns an observable that yields pool events.
   */
  events(): Observable<PoolEvent<ThreadType>>

  /**
   * Queue a task and return a promise that resolves once the task has been dequeued,
   * started and finished.
   *
   * @param task An async function that takes a thread instance and invokes it.
   */
  queue<Return>(task: TaskRunFunction<ThreadType, Return>): QueuedTask<ThreadType, Return>

  /**
   * Terminate all pool threads.
   *
   * @param force Set to `true` to kill the thread even if it cannot be stopped gracefully.
   */
  terminate(force?: boolean): Promise<void>
}

export interface PoolOptions {
  /** Maximum no. of tasks to run on one worker thread at a time. Defaults to one. */
  concurrency?: number

  /** Maximum no. of jobs to be queued for execution before throwing an error. */
  maxQueuedJobs?: number

  /** Gives that pool a name to be used for debug logging, letting you distinguish between log output of different pools. */
  name?: string

  /** No. of worker threads to spawn and to be managed by the pool. */
  size?: number
}

class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
  public static EventType = PoolEventType

  private readonly debug: DebugLogger.Debugger
  private readonly eventObservable: Observable<PoolEvent<ThreadType>>
  private readonly options: PoolOptions
  private readonly workers: Array<WorkerDescriptor<ThreadType>>

  private readonly eventSubject = new Subject<PoolEvent<ThreadType>>()
  private initErrors: Error[] = []
  private isClosing = false
  private nextTaskID = 1
  private taskQueue: Array<QueuedTask<ThreadType, any>> = []

  constructor(
    spawnWorker: () => Promise<ThreadType>,
    optionsOrSize?: number | PoolOptions
  ) {
    const options: PoolOptions = typeof optionsOrSize === "number"
      ? { size: optionsOrSize }
      : optionsOrSize || {}

    const { size = defaultPoolSize } = options

    this.debug = DebugLogger(`threads:pool:${slugify(options.name || String(nextPoolID++))}`)
    this.options = options
    this.workers = spawnWorkers(spawnWorker, size)

    this.eventObservable = multicast(Observable.from(this.eventSubject))

    Promise.all(this.workers.map(worker => worker.init)).then(
      () => this.eventSubject.next({
        type: PoolEventType.initialized,
        size: this.workers.length
      }),
      error => {
        this.debug("Error while initializing pool worker:", error)
        this.eventSubject.error(error)
        this.initErrors.push(error)
      }
    )
  }

  private findIdlingWorker(): WorkerDescriptor<ThreadType> | undefined {
    const { concurrency = 1 } = this.options
    return this.workers.find(worker => worker.runningTasks.length < concurrency)
  }

  private async runPoolTask(worker: WorkerDescriptor<ThreadType>, task: QueuedTask<ThreadType, any>) {
    const workerID = this.workers.indexOf(worker) + 1

    this.debug(`Running task #${task.id} on worker #${workerID}...`)
    this.eventSubject.next({
      type: PoolEventType.taskStart,
      taskID: task.id,
      workerID
    })

    try {
      const returnValue = await task.run(await worker.init)

      this.debug(`Task #${task.id} completed successfully`)
      this.eventSubject.next({
        type: PoolEventType.taskCompleted,
        returnValue,
        taskID: task.id,
        workerID
      })
    } catch (error) {
      this.debug(`Task #${task.id} failed`)
      this.eventSubject.next({
        type: PoolEventType.taskFailed,
        taskID: task.id,
        error,
        workerID
      })
    }
  }

  private async run(worker: WorkerDescriptor<ThreadType>, task: QueuedTask<ThreadType, any>) {
    const runPromise = (async () => {
      const removeTaskFromWorkersRunningTasks = () => {
        worker.runningTasks = worker.runningTasks.filter(someRunPromise => someRunPromise !== runPromise)
      }

      // Defer task execution by one tick to give handlers time to subscribe
      await delay(0)

      try {
        await this.runPoolTask(worker, task)
      } finally {
        removeTaskFromWorkersRunningTasks()

        if (!this.isClosing) {
          this.scheduleWork()
        }
      }
    })()

    worker.runningTasks.push(runPromise)
  }


  private scheduleWork() {
    this.debug(`Attempt de-queueing a task in order to run it...`)

    const availableWorker = this.findIdlingWorker()
    if (!availableWorker) return

    const nextTask = this.taskQueue.shift()
    if (!nextTask) {
      this.debug(`Task queue is empty`)
      this.eventSubject.next({ type: PoolEventType.taskQueueDrained })
      return
    }

    this.run(availableWorker, nextTask)
  }

  private taskCompletion(taskID: number) {
    return new Promise<any>((resolve, reject) => {
      const eventSubscription = this.events().subscribe(event => {
        if (event.type === PoolEventType.taskCompleted && event.taskID === taskID) {
          eventSubscription.unsubscribe()
          resolve(event.returnValue)
        } else if (event.type === PoolEventType.taskFailed && event.taskID === taskID) {
          eventSubscription.unsubscribe()
          reject(event.error)
        } else if (event.type === PoolEventType.terminated) {
          eventSubscription.unsubscribe()
          reject(Error("Pool has been terminated before task was run."))
        }
      })
    })
  }

  public async settled(allowResolvingImmediately: boolean = false): Promise<Error[]> {
    const getCurrentlyRunningTasks = () => flatMap(this.workers, worker => worker.runningTasks)

    const taskFailures: Error[] = []

    const failureSubscription = this.eventObservable.subscribe(event => {
      if (event.type === PoolEventType.taskFailed) {
        taskFailures.push(event.error)
      }
    })

    if (this.initErrors.length > 0) {
      return Promise.reject(this.initErrors[0])
    }
    if (allowResolvingImmediately && this.taskQueue.length === 0) {
      await allSettled(getCurrentlyRunningTasks())
      return taskFailures
    }

    await new Promise<void>((resolve, reject) => {
      const subscription = this.eventObservable.subscribe({
        next(event) {
          if (event.type === PoolEventType.taskQueueDrained) {
            subscription.unsubscribe()
            resolve(void 0)
          }
        },
        error: reject     // make a pool-wide error reject the completed() result promise
      })
    })

    await allSettled(getCurrentlyRunningTasks())
    failureSubscription.unsubscribe()

    return taskFailures
  }

  public async completed(allowResolvingImmediately: boolean = false) {
    const settlementPromise = this.settled(allowResolvingImmediately)

    const earlyExitPromise = new Promise<Error[]>((resolve, reject) => {
      const subscription = this.eventObservable.subscribe({
        next(event) {
          if (event.type === PoolEventType.taskQueueDrained) {
            subscription.unsubscribe()
            resolve(settlementPromise)
          } else if (event.type === PoolEventType.taskFailed) {
            subscription.unsubscribe()
            reject(event.error)
          }
        },
        error: reject     // make a pool-wide error reject the completed() result promise
      })
    })

    const errors = await Promise.race([
      settlementPromise,
      earlyExitPromise
    ])

    if (errors.length > 0) {
      throw errors[0]
    }
  }

  public events() {
    return this.eventObservable
  }

  public queue(taskFunction: TaskRunFunction<ThreadType, any>) {
    const { maxQueuedJobs = Infinity } = this.options

    if (this.isClosing) {
      throw Error(`Cannot schedule pool tasks after terminate() has been called.`)
    }
    if (this.initErrors.length > 0) {
      throw this.initErrors[0]
    }

    const taskID = this.nextTaskID++
    const taskCompletion = this.taskCompletion(taskID)

    taskCompletion.catch((error) => {
      // Prevent unhandled rejections here as we assume the user will use
      // `pool.completed()`, `pool.settled()` or `task.catch()` to handle errors
      this.debug(`Task #${taskID} errored:`, error)
    })

    const task: QueuedTask<ThreadType, any> = {
      id: taskID,
      run: taskFunction,
      cancel: () => {
        if (this.taskQueue.indexOf(task) === -1) return
        this.taskQueue = this.taskQueue.filter(someTask => someTask !== task)
        this.eventSubject.next({
          type: PoolEventType.taskCanceled,
          taskID: task.id
        })
      },
      then: taskCompletion.then.bind(taskCompletion)
    }

    if (this.taskQueue.length >= maxQueuedJobs) {
      throw Error(
        "Maximum number of pool tasks queued. Refusing to queue another one.\n" +
        "This usually happens for one of two reasons: We are either at peak " +
        "workload right now or some tasks just won't finish, thus blocking the pool."
      )
    }

    this.debug(`Queueing task #${task.id}...`)
    this.taskQueue.push(task)

    this.eventSubject.next({
      type: PoolEventType.taskQueued,
      taskID: task.id
    })

    this.scheduleWork()
    return task
  }

  public async terminate(force?: boolean) {
    this.isClosing = true
    if (!force) {
      await this.completed(true)
    }
    this.eventSubject.next({
      type: PoolEventType.terminated,
      remainingQueue: [...this.taskQueue]
    })
    this.eventSubject.complete()
    await Promise.all(
      this.workers.map(async worker => Thread.terminate(await worker.init))
    )
  }
}

/**
 * Thread pool constructor. Creates a new pool and spawns its worker threads.
 */
function PoolConstructor<ThreadType extends Thread>(
  spawnWorker: () => Promise<ThreadType>,
  optionsOrSize?: number | PoolOptions
) {
  // The function exists only so we don't need to use `new` to create a pool (we still can, though).
  // If the Pool is a class or not is an implementation detail that should not concern the user.
  return new WorkerPool(spawnWorker, optionsOrSize)
}

(PoolConstructor as any).EventType = PoolEventType

/**
 * Thread pool constructor. Creates a new pool and spawns its worker threads.
 */
export const Pool = PoolConstructor as typeof PoolConstructor & { EventType: typeof PoolEventType }