rxstack/rxstack

View on GitHub
packages/worker-threads-pool/README.md

Summary

Maintainability
Test Coverage
# RxStack Worker Threads Pool Module

> Offload tasks to a pool of workers in rxstack application.

## Installation

```
npm install @rxstack/worker-threads-pool --save
```

## Documentation

* [Setup](#setup)
* [Module options](#module-options)
* [Create a task](#task-create)
* [Run a task](#task-run)
* [API](#api)

### <a name="setup"></a>  Setup
`WorkerThreadsPool` module is installed and configured by default in [`skeleton`](https://github.com/rxstack/skeleton) application.

### <a name="module-options"></a>  Module Options
The module accepts the following options::
- `path`: path to executable .js file
- `max`: Maximum number of workers allowed in the pool. Defaults to 1
- `maxWaiting`: Maximum number of workers waiting to be started when the pool is full. 
   It will trigger an exception if limit is reached. Defaults to 10
   
### <a name="task-create"></a>  Create a task
Each task should extends `AbstractWorkerThread` class:

```typescript
// ./src/app/workers/my-task.ts
import {AbstractWorkerThread} from '@rxstack/worker-threads-pool';
import {Injectable} from 'injection-js';
import {parentPort, workerData} from 'worker_threads';

@Injectable()
export class MyTask extends AbstractWorkerThread {

  async run(): Promise<void> {
    parentPort.postMessage(`hello ${workerData.options.message} - from worker`);
  }

  getName(): string {
    return 'my-task';
  }
}
```

then register it in the application providers:

```typescript
// ./src/app/workers/APP_WORKER_PROVIDERS.ts
import {ProviderDefinition} from '@rxstack/core';
import {WORKER_THREADS_POOL_REGISTRY} from '@rxstack/worker-threads-pool';
import {MyTask} from './my-task';

export const APP_WORKER_PROVIDERS: ProviderDefinition[] = [
  // ...
  {
    provide: WORKER_THREADS_POOL_REGISTRY,
    useClass: MyTask,
    multi: true
  }
];
```

now your task is ready to be run.

### <a name="task-run"></a>  Run a task
you can run a task from anywhere. In the example below we'll run it from a controller 
and communicate with the connected client via socket connection:

```typescript
import {Injectable, Injector} from 'injection-js';
import {InjectorAwareInterface, Request, Response, WebSocket} from '@rxstack/core';
import {WorkerThreadsPool} from '@rxstack/worker-threads-pool';

@Injectable()
export class IndexController implements InjectorAwareInterface {

  private injector: Injector;

  setInjector(injector: Injector): void {
    this.injector = injector;
  }

  @WebSocket('app_index')
  async indexAction(request: Request): Promise<Response> {
    const pool = this.injector.get(WorkerThreadsPool);
    
    pool.acquire('my-task', {massage: 'world'}).then((worker) => {
      worker.on('message', (data: any) => {
        // communication between client and worker via sockets
        request.connection.emit('message', data);
      });
      worker.on('online', (err: any) => {
        // task is started
        request.connection.emit('message', 'task is started');
      });
      worker.on('error', (err: any) => {
        // handle errors
        request.connection.emit('message', err.message);
      });
      worker.on('exit', (code) => {
        // task is completed
        request.connection.emit('message', code === 0 ? 'success' : 'fail');
      });
    }).catch(e => console.error(e.message));
    
    return new Response('Task is scheduled', 202);
  }
}
```

### <a name="api"></a>  API

#### `pool.acquire(name, [options])`

- `name`: task name, 
- `options`(optional): data passed to `workerData`
    
returns a promise of `Worker` or throws an exception if task does not exist or queue is full.
    
#### `pool.stats()`

returns `workerSize` and `queueSize`

#### `pool.terminate()`
 
clears the queue and terminates all running workers

## License

Licensed under the [MIT license](LICENSE).