server/backends/irc/connection-manager/connectionManager.js
#!/usr/bin/env node
//
// Copyright 2014 Ilkka Oksanen <iao@iki.fi>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
//
// Minimal connection manager that keeps TCP sockets alive even if
// rest of the system is restarted. Allows nondistruptive updates.
const init = require('../../../lib/init');
init.configureProcess('irc-connman');
const net = require('net');
const tls = require('tls');
const carrier = require('carrier');
const isUtf8 = require('utf-8-validate');
const iconv = require('iconv-lite');
const ip = require('ip');
const conf = require('../../../lib/conf');
const log = require('../../../lib/log');
const courier = require('../../../lib/courier').createEndPoint('connectionmanager');
const sockets = {};
const nextNetworkConnectionSlot = {};
let identServer;
const LAG_POLL_INTERVAL = 60 * 1000; // 60s
init.on('beforeShutdown', async () => {
if (conf.get('irc:identd')) {
identServer.close();
}
await courier.quit();
for (const key of Object.keys(sockets)) {
const socket = sockets[key];
socket.end('QUIT :MAS server restart.\r\n');
socket.destroy();
}
});
init.on('afterShutdown', () => {
log.quit();
});
function handleIdentConnection(socket) {
const timer = setTimeout(() => {
if (socket) {
socket.destroy();
}
}, 3000);
socket.on('error', error => {
log.info(`Ident socket error: ${error}`);
socket.destroy();
});
carrier.carry(socket, line => {
const [localPort, remotePort] = line.split(',').map(port => parseInt(port));
const prefix = `${localPort}, ${remotePort}`;
if (Number.isInteger(localPort) && Number.isInteger(remotePort)) {
let resp = 'ERROR : NO-USER';
for (const key of Object.keys(sockets)) {
if (
sockets[key].localPort === localPort &&
sockets[key].remotePort === remotePort &&
ip.isEqual(sockets[key].remoteAddress, socket.remoteAddress)
) {
resp = `USERID : UNIX : ${sockets[key].nick}`;
break;
}
}
socket.write(`${prefix} : ${resp}\r\n`);
log.info(`Ident request from ${socket.remoteAddress}, req: ${line}, resp: ${resp}`);
} else {
log.info(`Ident request from ${socket.remoteAddress} is invalid, req: ${line}`);
}
clearTimeout(timer);
socket.end();
});
}
// Connect
courier.on('connect', ({ network, userId, nick, delay }) => {
const rateLimit = conf.get(`irc:networks:${network}:rate_limit`); // connections per minute
let rateLimitDelay = 0;
if (!nextNetworkConnectionSlot[network] || nextNetworkConnectionSlot[network] < Date.now()) {
// Rate limiting not active
nextNetworkConnectionSlot[network] = Date.now();
} else {
rateLimitDelay = nextNetworkConnectionSlot[network] - Date.now();
}
setTimeout(() => connect(userId, nick, network), rateLimitDelay + delay);
nextNetworkConnectionSlot[network] += Math.round((60 / rateLimit) * 1000);
});
// Disconnect
courier.on('disconnect', ({ network, userId, reason }) => {
const socketName = `${userId}:${network}`;
write({ userId, network, reportError: false }, `QUIT :${reason}`);
if (sockets[socketName]) {
sockets[socketName].end();
}
});
// Write
courier.on('write', ({ userId, network, line }) => {
write({ userId, network, reportError: true }, line);
});
(async function main() {
// Start IDENT server
if (conf.get('irc:identd')) {
identServer = net.createServer(handleIdentConnection);
identServer.listen(conf.get('irc:identd_port'));
}
await courier.clearInbox('ircparser');
courier.callNoWait('ircparser', 'restarted');
await courier.listen();
})();
function connect(userId, nick, network) {
if (sockets[`${userId}:${network}`]) {
log.warn({ id: userId }, `Impossible happened. Already connected to IRC network: ${network}`);
return;
}
const port = conf.get(`irc:networks:${network}:port`);
const host = conf.get(`irc:networks:${network}:host`);
const options = { host, port };
let socket;
let ssl = false;
if (conf.get(`irc:networks:${network}:ssl`)) {
ssl = true;
socket = tls.connect(port, host, {});
} else {
socket = net.connect(options);
}
log.info({ id: userId }, `Connecting to IRC server, SSL: ${ssl}, host: ${host}, port: ${port}`);
socket.nick = nick;
socket.setKeepAlive(true, 2 * 60 * 1000); // 2 minutes
function sendPing() {
if (Date.now() - socket.last > LAG_POLL_INTERVAL) {
// Nothing has been sent after previous round
socket.write(`PING ${socket.ircServerName}\r\n`);
}
}
socket.on('connect', () => {
courier.callNoWait('ircparser', 'connected', { userId, network });
socket.pingTimer = setInterval(sendPing, LAG_POLL_INTERVAL);
});
let buffer = '';
socket.on('data', data => {
socket.last = Date.now();
// IRC protocol doesn't have character set concept, we need to guess. Algorithm is simple,
// if the received binary data is valid utf8 then do no conversion. Else assume that the
// character set is iso-8859-15 and convert it to utf8.
const chunk = buffer + (isUtf8(data) ? data.toString() : iconv.decode(data, 'iso-8859-15'));
const lines = chunk.split(/\r\n/);
buffer = lines.pop(); // Save the potential partial line to buffer
lines.forEach(line => {
const proceed = handlePing(socket, line);
if (proceed) {
courier.callNoWait('ircparser', 'data', { userId, network, line });
}
});
});
socket.on('end', () => {
handleEnd(userId, network, null);
});
socket.on('error', error => {
handleEnd(userId, network, error);
});
socket.on('close', () => {
handleEnd(userId, network, null);
});
sockets[`${userId}:${network}`] = socket;
}
function write(options, data) {
const socket = sockets[`${options.userId}:${options.network}`];
if (!socket) {
if (options.reportError) {
courier.callNoWait('ircparser', 'noconnection', { userId: options.userId, network: options.network });
}
return;
}
const chunk = typeof data === 'string' ? [data] : data;
chunk.forEach(line => socket.write(`${line}\r\n`));
socket.last = Date.now();
}
// Minimal parser to handle server sent PING command at this layer
function handlePing(socket, line) {
const parts = line.split(' ').slice(line.startsWith(':') ? 1 : 0);
const [command, parameter] = parts;
switch (command) {
case 'PING':
socket.write(`PONG :${socket.ircServerName}\r\n`);
return false;
case '004':
socket.ircServerName = parameter; // RFC 2812, reply 004
break;
default:
}
return true;
}
function handleEnd(userId, network, error) {
const socket = sockets[`${userId}:${network}`];
const reason = error ? error.code : 'connection closed by the server';
if (!socket) {
return; // Already handled
}
delete sockets[`${userId}:${network}`];
clearInterval(socket.pingTimer);
log.info({ id: userId }, 'IRC connection closed by the server or network.');
courier.callNoWait('ircparser', 'disconnected', { userId, network, reason });
}