lts/lib/internal/child_process/serialization.js
'use strict';
const {
JSONParse,
JSONStringify,
Symbol,
} = primordials;
const { Buffer } = require('buffer');
const { StringDecoder } = require('string_decoder');
const v8 = require('v8');
const { isArrayBufferView } = require('internal/util/types');
const assert = require('internal/assert');
const kMessageBuffer = Symbol('kMessageBuffer');
const kJSONBuffer = Symbol('kJSONBuffer');
const kStringDecoder = Symbol('kStringDecoder');
// Extend V8's serializer APIs to give more JSON-like behaviour in
// some cases; in particular, for native objects this serializes them the same
// way that JSON does rather than throwing an exception.
const kArrayBufferViewTag = 0;
const kNotArrayBufferViewTag = 1;
class ChildProcessSerializer extends v8.DefaultSerializer {
_writeHostObject(object) {
if (isArrayBufferView(object)) {
this.writeUint32(kArrayBufferViewTag);
return super._writeHostObject(object);
}
this.writeUint32(kNotArrayBufferViewTag);
this.writeValue({ ...object });
}
}
class ChildProcessDeserializer extends v8.DefaultDeserializer {
_readHostObject() {
const tag = this.readUint32();
if (tag === kArrayBufferViewTag)
return super._readHostObject();
assert(tag === kNotArrayBufferViewTag);
return this.readValue();
}
}
// Messages are parsed in either of the following formats:
// - Newline-delimited JSON, or
// - V8-serialized buffers, prefixed with their length as a big endian uint32
// (aka 'advanced')
const advanced = {
initMessageChannel(channel) {
channel[kMessageBuffer] = Buffer.alloc(0);
channel.buffering = false;
},
*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;
let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
while (messageBuffer.length > 4) {
const size = messageBuffer.readUInt32BE();
if (messageBuffer.length < 4 + size) {
break;
}
const deserializer = new ChildProcessDeserializer(
messageBuffer.subarray(4, 4 + size));
messageBuffer = messageBuffer.subarray(4 + size);
deserializer.readHeader();
yield deserializer.readValue();
}
channel[kMessageBuffer] = messageBuffer;
channel.buffering = messageBuffer.length > 0;
},
writeChannelMessage(channel, req, message, handle) {
const ser = new ChildProcessSerializer();
ser.writeHeader();
ser.writeValue(message);
const serializedMessage = ser.releaseBuffer();
const sizeBuffer = Buffer.allocUnsafe(4);
sizeBuffer.writeUInt32BE(serializedMessage.length);
return channel.writeBuffer(req, Buffer.concat([
sizeBuffer,
serializedMessage
]), handle);
},
};
const json = {
initMessageChannel(channel) {
channel[kJSONBuffer] = '';
channel[kStringDecoder] = undefined;
},
*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;
if (channel[kStringDecoder] === undefined)
channel[kStringDecoder] = new StringDecoder('utf8');
const chunks = channel[kStringDecoder].write(readData).split('\n');
const numCompleteChunks = chunks.length - 1;
// Last line does not have trailing linebreak
const incompleteChunk = chunks[numCompleteChunks];
if (numCompleteChunks === 0) {
channel[kJSONBuffer] += incompleteChunk;
} else {
chunks[0] = channel[kJSONBuffer] + chunks[0];
for (let i = 0; i < numCompleteChunks; i++)
yield JSONParse(chunks[i]);
channel[kJSONBuffer] = incompleteChunk;
}
channel.buffering = channel[kJSONBuffer].length !== 0;
},
writeChannelMessage(channel, req, message, handle) {
const string = JSONStringify(message) + '\n';
return channel.writeUtf8String(req, string, handle);
},
};
module.exports = { advanced, json };