andywer/threads.js

View on GitHub
src/worker/index.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import isSomeObservable from "is-observable"
import { Observable, Subscription } from "observable-fns"
import { deserialize, serialize } from "../common"
import { isTransferDescriptor, TransferDescriptor } from "../transferable"
import {
  MasterJobCancelMessage,
  MasterJobRunMessage,
  MasterMessageType,
  SerializedError,
  WorkerInitMessage,
  WorkerJobErrorMessage,
  WorkerJobResultMessage,
  WorkerJobStartMessage,
  WorkerMessageType,
  WorkerUncaughtErrorMessage
} from "../types/messages"
import { WorkerFunction, WorkerModule } from "../types/worker"
import Implementation from "./implementation"

export { registerSerializer } from "../common"
export { Transfer } from "../transferable"

/** Returns `true` if this code is currently running in a worker. */
export const isWorkerRuntime = Implementation.isWorkerRuntime

let exposeCalled = false

const activeSubscriptions = new Map<number, Subscription<any>>()

const isMasterJobCancelMessage = (thing: any): thing is MasterJobCancelMessage => thing && thing.type === MasterMessageType.cancel
const isMasterJobRunMessage = (thing: any): thing is MasterJobRunMessage => thing && thing.type === MasterMessageType.run

/**
 * There are issues with `is-observable` not recognizing zen-observable's instances.
 * We are using `observable-fns`, but it's based on zen-observable, too.
 */
const isObservable = (thing: any): thing is Observable<any> => isSomeObservable(thing) || isZenObservable(thing)

function isZenObservable(thing: any): thing is Observable<any> {
  return thing && typeof thing === "object" && typeof thing.subscribe === "function"
}

function deconstructTransfer(thing: any) {
  return isTransferDescriptor(thing)
    ? { payload: thing.send, transferables: thing.transferables }
    : { payload: thing, transferables: undefined }
}

function postFunctionInitMessage() {
  const initMessage: WorkerInitMessage = {
    type: WorkerMessageType.init,
    exposed: {
      type: "function"
    }
  }
  Implementation.postMessageToMaster(initMessage)
}

function postModuleInitMessage(methodNames: string[]) {
  const initMessage: WorkerInitMessage = {
    type: WorkerMessageType.init,
    exposed: {
      type: "module",
      methods: methodNames
    }
  }
  Implementation.postMessageToMaster(initMessage)
}

function postJobErrorMessage(uid: number, rawError: Error | TransferDescriptor<Error>) {
  const { payload: error, transferables } = deconstructTransfer(rawError)
  const errorMessage: WorkerJobErrorMessage = {
    type: WorkerMessageType.error,
    uid,
    error: serialize(error) as any as SerializedError
  }
  Implementation.postMessageToMaster(errorMessage, transferables)
}

function postJobResultMessage(uid: number, completed: boolean, resultValue?: any) {
  const { payload, transferables } = deconstructTransfer(resultValue)
  const resultMessage: WorkerJobResultMessage = {
    type: WorkerMessageType.result,
    uid,
    complete: completed ? true : undefined,
    payload
  }
  Implementation.postMessageToMaster(resultMessage, transferables)
}

function postJobStartMessage(uid: number, resultType: WorkerJobStartMessage["resultType"]) {
  const startMessage: WorkerJobStartMessage = {
    type: WorkerMessageType.running,
    uid,
    resultType
  }
  Implementation.postMessageToMaster(startMessage)
}

function postUncaughtErrorMessage(error: Error) {
  try {
    const errorMessage: WorkerUncaughtErrorMessage = {
      type: WorkerMessageType.uncaughtError,
      error: serialize(error) as any as SerializedError
    }
    Implementation.postMessageToMaster(errorMessage)
  } catch (subError) {
    // tslint:disable-next-line no-console
    console.error(
      "Not reporting uncaught error back to master thread as it " +
      "occured while reporting an uncaught error already." +
      "\nLatest error:", subError,
      "\nOriginal error:", error
    )
  }
}

async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) {
  let syncResult: any

  try {
    syncResult = fn(...args)
  } catch (error) {
    return postJobErrorMessage(jobUID, error)
  }

  const resultType = isObservable(syncResult) ? "observable" : "promise"
  postJobStartMessage(jobUID, resultType)

  if (isObservable(syncResult)) {
    const subscription = syncResult.subscribe(
      value => postJobResultMessage(jobUID, false, serialize(value)),
      error => {
        postJobErrorMessage(jobUID, serialize(error) as any)
        activeSubscriptions.delete(jobUID)
      },
      () => {
        postJobResultMessage(jobUID, true)
        activeSubscriptions.delete(jobUID)
      }
    )
    activeSubscriptions.set(jobUID, subscription)
  } else {
    try {
      const result = await syncResult
      postJobResultMessage(jobUID, true, serialize(result))
    } catch (error) {
      postJobErrorMessage(jobUID, serialize(error) as any)
    }
  }
}

/**
 * Expose a function or a module (an object whose values are functions)
 * to the main thread. Must be called exactly once in every worker thread
 * to signal its API to the main thread.
 *
 * @param exposed Function or object whose values are functions
 */
export function expose(exposed: WorkerFunction | WorkerModule<any>) {
  if (!Implementation.isWorkerRuntime()) {
    throw Error("expose() called in the master thread.")
  }
  if (exposeCalled) {
    throw Error("expose() called more than once. This is not possible. Pass an object to expose() if you want to expose multiple functions.")
  }
  exposeCalled = true

  if (typeof exposed === "function") {
    Implementation.subscribeToMasterMessages(messageData => {
      if (isMasterJobRunMessage(messageData) && !messageData.method) {
        runFunction(messageData.uid, exposed, messageData.args.map(deserialize))
      }
    })
    postFunctionInitMessage()
  } else if (typeof exposed === "object" && exposed) {
    Implementation.subscribeToMasterMessages(messageData => {
      if (isMasterJobRunMessage(messageData) && messageData.method) {
        runFunction(messageData.uid, exposed[messageData.method], messageData.args.map(deserialize))
      }
    })

    const methodNames = Object.keys(exposed).filter(key => typeof exposed[key] === "function")
    postModuleInitMessage(methodNames)
  } else {
    throw Error(`Invalid argument passed to expose(). Expected a function or an object, got: ${exposed}`)
  }

  Implementation.subscribeToMasterMessages(messageData => {
    if (isMasterJobCancelMessage(messageData)) {
      const jobUID = messageData.uid
      const subscription = activeSubscriptions.get(jobUID)

      if (subscription) {
        subscription.unsubscribe()
        activeSubscriptions.delete(jobUID)
      }
    }
  })
}

if (typeof self !== "undefined" && typeof self.addEventListener === "function" && Implementation.isWorkerRuntime()) {
  self.addEventListener("error", event => {
    // Post with some delay, so the master had some time to subscribe to messages
    setTimeout(() => postUncaughtErrorMessage(event.error || event), 250)
  })
  self.addEventListener("unhandledrejection", event => {
    const error = (event as any).reason
    if (error && typeof (error as any).message === "string") {
      // Post with some delay, so the master had some time to subscribe to messages
      setTimeout(() => postUncaughtErrorMessage(error), 250)
    }
  })
}

if (typeof process !== "undefined" && typeof process.on === "function" && Implementation.isWorkerRuntime()) {
  process.on("uncaughtException", (error) => {
    // Post with some delay, so the master had some time to subscribe to messages
    setTimeout(() => postUncaughtErrorMessage(error), 250)
  })
  process.on("unhandledRejection", (error) => {
    if (error && typeof (error as any).message === "string") {
      // Post with some delay, so the master had some time to subscribe to messages
      setTimeout(() => postUncaughtErrorMessage(error as any), 250)
    }
  })
}