lts/lib/internal/cluster/master.js
'use strict';
const {
Map,
ObjectKeys,
ObjectValues,
} = primordials;
const assert = require('internal/assert');
const { fork } = require('child_process');
const path = require('path');
const EventEmitter = require('events');
const RoundRobinHandle = require('internal/cluster/round_robin_handle');
const SharedHandle = require('internal/cluster/shared_handle');
const Worker = require('internal/cluster/worker');
const { internal, sendHelper } = require('internal/cluster/utils');
const cluster = new EventEmitter();
const intercom = new EventEmitter();
const SCHED_NONE = 1;
const SCHED_RR = 2;
const [ minPort, maxPort ] = [ 1024, 65535 ];
const { validatePort } = require('internal/validators');
module.exports = cluster;
const handles = new Map();
cluster.isWorker = false;
cluster.isMaster = true;
cluster.Worker = Worker;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR; // Master distributes connections.
let ids = 0;
let debugPortOffset = 1;
let initialized = false;
// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
if (schedulingPolicy === 'rr')
schedulingPolicy = SCHED_RR;
else if (schedulingPolicy === 'none')
schedulingPolicy = SCHED_NONE;
else if (process.platform === 'win32') {
// Round-robin doesn't perform well on
// Windows due to the way IOCP is wired up.
schedulingPolicy = SCHED_NONE;
} else
schedulingPolicy = SCHED_RR;
cluster.schedulingPolicy = schedulingPolicy;
cluster.setupMaster = function(options) {
const settings = {
args: process.argv.slice(2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options
};
// Tell V8 to write profile data for each process to a separate file.
// Without --logfile=v8-%p.log, everything ends up in a single, unusable
// file. (Unusable because what V8 logs are memory addresses and each
// process has its own memory mappings.)
if (settings.execArgv.some((s) => s.startsWith('--prof')) &&
!settings.execArgv.some((s) => s.startsWith('--logfile='))) {
settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log'];
}
cluster.settings = settings;
if (initialized === true)
return process.nextTick(setupSettingsNT, settings);
initialized = true;
schedulingPolicy = cluster.schedulingPolicy; // Freeze policy.
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
process.nextTick(setupSettingsNT, settings);
process.on('internalMessage', (message) => {
if (message.cmd !== 'NODE_DEBUG_ENABLED')
return;
for (const worker of ObjectValues(cluster.workers)) {
if (worker.state === 'online' || worker.state === 'listening') {
process._debugProcess(worker.process.pid);
} else {
worker.once('online', function() {
process._debugProcess(this.process.pid);
});
}
}
});
};
function setupSettingsNT(settings) {
cluster.emit('setup', settings);
}
function createWorkerProcess(id, env) {
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
const execArgv = [...cluster.settings.execArgv];
const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;
const nodeOptions = process.env.NODE_OPTIONS ?
process.env.NODE_OPTIONS : '';
if (execArgv.some((arg) => arg.match(debugArgRegex)) ||
nodeOptions.match(debugArgRegex)) {
let inspectPort;
if ('inspectPort' in cluster.settings) {
if (typeof cluster.settings.inspectPort === 'function')
inspectPort = cluster.settings.inspectPort();
else
inspectPort = cluster.settings.inspectPort;
validatePort(inspectPort);
} else {
inspectPort = process.debugPort + debugPortOffset;
if (inspectPort > maxPort)
inspectPort = inspectPort - maxPort + minPort - 1;
debugPortOffset++;
}
execArgv.push(`--inspect-port=${inspectPort}`);
}
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
function removeWorker(worker) {
assert(worker);
delete cluster.workers[worker.id];
if (ObjectKeys(cluster.workers).length === 0) {
assert(handles.size === 0, 'Resource leak detected.');
intercom.emit('disconnect');
}
}
function removeHandlesForWorker(worker) {
assert(worker);
handles.forEach((handle, key) => {
if (handle.remove(worker))
handles.delete(key);
});
}
cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
worker.on('message', function(message, handle) {
cluster.emit('message', this, message, handle);
});
worker.process.once('exit', (exitCode, signalCode) => {
/*
* Remove the worker from the workers list only
* if it has disconnected, otherwise we might
* still want to access it.
*/
if (!worker.isConnected()) {
removeHandlesForWorker(worker);
removeWorker(worker);
}
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'dead';
worker.emit('exit', exitCode, signalCode);
cluster.emit('exit', worker, exitCode, signalCode);
});
worker.process.once('disconnect', () => {
/*
* Now is a good time to remove the handles
* associated with this worker because it is
* not connected to the master anymore.
*/
removeHandlesForWorker(worker);
/*
* Remove the worker from the workers list only
* if its process has exited. Otherwise, we might
* still want to access it.
*/
if (worker.isDead())
removeWorker(worker);
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'disconnected';
worker.emit('disconnect');
cluster.emit('disconnect', worker);
});
worker.process.on('internalMessage', internal(worker, onmessage));
process.nextTick(emitForkNT, worker);
cluster.workers[worker.id] = worker;
return worker;
};
function emitForkNT(worker) {
cluster.emit('fork', worker);
}
cluster.disconnect = function(cb) {
const workers = ObjectKeys(cluster.workers);
if (workers.length === 0) {
process.nextTick(() => intercom.emit('disconnect'));
} else {
for (const worker of ObjectValues(cluster.workers)) {
if (worker.isConnected()) {
worker.disconnect();
}
}
}
if (typeof cb === 'function')
intercom.once('disconnect', cb);
};
function onmessage(message, handle) {
const worker = this;
if (message.act === 'online')
online(worker);
else if (message.act === 'queryServer')
queryServer(worker, message);
else if (message.act === 'listening')
listening(worker, message);
else if (message.act === 'exitedAfterDisconnect')
exitedAfterDisconnect(worker, message);
else if (message.act === 'close')
close(worker, message);
}
function online(worker) {
worker.state = 'online';
worker.emit('online');
cluster.emit('online', worker);
}
function exitedAfterDisconnect(worker, message) {
worker.exitedAfterDisconnect = true;
send(worker, { ack: message.seq });
}
function queryServer(worker, message) {
// Stop processing if worker already disconnecting
if (worker.exitedAfterDisconnect)
return;
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
let handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
// Find shortest path for unix sockets because of the ~100 byte limit
if (message.port < 0 && typeof address === 'string' &&
process.platform !== 'win32') {
address = path.relative(process.cwd(), address);
if (message.address.length < address.length)
address = message.address;
}
let constructor = RoundRobinHandle;
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}
handle = new constructor(key, address, message);
handles.set(key, handle);
}
if (!handle.data)
handle.data = message.data;
// Set custom server data
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
function listening(worker, message) {
const info = {
addressType: message.addressType,
address: message.address,
port: message.port,
fd: message.fd
};
worker.state = 'listening';
worker.emit('listening', info);
cluster.emit('listening', worker, info);
}
// Server in worker is closing, remove from list. The handle may have been
// removed by a prior call to removeHandlesForWorker() so guard against that.
function close(worker, message) {
const key = message.key;
const handle = handles.get(key);
if (handle && handle.remove(worker))
handles.delete(key);
}
function send(worker, message, handle, cb) {
return sendHelper(worker.process, message, handle, cb);
}
// Extend generic Worker with methods specific to the master process.
Worker.prototype.disconnect = function() {
this.exitedAfterDisconnect = true;
send(this, { act: 'disconnect' });
removeHandlesForWorker(this);
removeWorker(this);
return this;
};
Worker.prototype.destroy = function(signo) {
const proc = this.process;
signo = signo || 'SIGTERM';
if (this.isConnected()) {
this.once('disconnect', () => proc.kill(signo));
this.disconnect();
return;
}
proc.kill(signo);
};