h2non/thread.js

View on GitHub
lib/task.js

Summary

Maintainability
F
6 days
Test Coverage
var _ = require('./utils')

module.exports = Task

function Task(thread, env) {
  this.id = _.uuid()
  this.thread = thread
  this.worker = thread.worker
  this.env = env || {}
  this.time = this.memoized = null
  this.listeners = { error: [], success: [], end: [] }
}

Task.intervalCheckTime = 200

Task.prototype.bind = Task.prototype.set = function (env) {
  _.extend(this.env, env)
  return this
}

Task.prototype.run = Task.prototype.exec = function (fn, env, args) {
  var thread = this.thread

  if (!thread || thread._terminated) {
    throw new Error('cannot execute the task. The thread was terminated')
  }
  if (!_.isFn(fn)) {
    throw new TypeError('first argument must be a function')
  }

  if (_.isArr(arguments[1])) args = arguments[1]
  if (_.isObj(arguments[2])) env = arguments[2]

  env = _.serializeMap(_.extend({}, this.env, env))
  this.memoized = null
  this.time = _.now()

  if (thread.maxTaskDelay >= Task.intervalCheckTime) {
    checkInterval(this, thread.maxTaskDelay)
  }
  if (thread._tasks.indexOf(this) === -1) {
    thread._tasks.push(this)
  }

  this['finally'](cleanTask(thread, this))

  addWorkerMessageListener(this)
  sendMessage(this, env, fn, args)

  return this
}

Task.prototype.then = Task.prototype.success = function (fn, errorFn) {
  if (_.isFn(fn)) pushStateHandler(this, 'success', fn)
  if (_.isFn(errorFn)) this['catch'](errorFn)
  return this
}

Task.prototype['catch'] = Task.prototype.error = function (fn) {
  if (_.isFn(fn)) pushStateHandler(this, 'error', fn)
  return this
}

Task.prototype['finally'] = Task.prototype.finish = function (fn) {
  if (_.isFn(fn)) {
    if (this.memoized)
      fn.call(null, getValue(this.memoized))
    else
      this.listeners.end.push(fn)
  }
  return this
}

Task.prototype.flush = function () {
  this.memoized = this.thread = null
  this.worker = this.env = this.listeners = null
}

Task.prototype.flushed = function () {
  return !this.thread && !this.worker
}

Task.create = function (thread) {
  return new Task(thread)
}

function sendMessage(task, env, fn, args) {
  task.worker.postMessage({
    id: task.id,
    type: 'run',
    env: env,
    src: fn.toString(),
    args: args
  })
}

function checkInterval(task, maxDelay) {
  var now = _.now()
  task._timer = setInterval(function () {
    if (task.memoized) {
      clearTimer.call(task)
    } else {
      checkTaskDelay.call(task, now, maxDelay)
    }
  }, Task.intervalCheckTime)
}

function addWorkerMessageListener(task) {
  task.worker.addEventListener('message', onMessage(task))
}

function pushStateHandler(task, type, fn) {
  if (task.memoized) {
    if (task.memoized.type === ('run:' + type))
      fn.call(null, getValue(task.memoized))
  } else {
    task.listeners[type].push(fn)
  }
}

function dispathEvent(task, value, type) {
  if (typeof task._timer === 'number') clearTimer.call(task)
  dispatcher(task, value)(task.listeners[type])
}

function dispatcher(self, value) {
  return function recur(pool) {
    var fn = null
    if (_.isArr(pool)) {
      fn = pool.shift()
      if (fn) {
        fn.call(null, value)
        if (pool.length) recur(pool)
      }
    }
  }
}

function createError(data) {
  var err = new Error(data.error)
  err.name = data.errorName
  err.stack = data.errorStack
  return err
}

function cleanTask(thread, task) {
  return function () {
    var index = thread._tasks.indexOf(task)
    thread._latestTask = _.now()
    if (index >= 0) thread._tasks.splice(index, 1)
  }
}

function checkTaskDelay(time, maxDelay) {
  var error = null
  if ((_.now() - time) > maxDelay) {
    error = new Error('maximum task execution time exceeded')
    this.memoized = { type: 'run:error', error: error }
    dispathEvent(this, error, 'error')
    dispathEvent(this, error, 'end')
    clearTimer.call(this)
  }
}

function clearTimer() {
  clearInterval(this._timer)
  this._timer = null
}

function isValidEvent(type) {
  return type === 'run:error' || type === 'run:success'
}

function onMessage(task) {
  return function handler(ev) {
    var data = ev.data
    if (data && data.id === task.id && isValidEvent(data.type)) {
      task.worker.removeEventListener('message', handler)
      task.memoized = data
      triggerMessage(task, data)
    }
  }
}

function triggerMessage(task, data) {
  var value = getValue(data)
  dispathEvent(task, value, data.type.split(':')[1])
  dispathEvent(task, value, 'end')
}

function getValue(data) {
  return data.type === 'run:error'
    ? createError(data)
    : data.value
}