plugins/messagesniffer.js
// messagesniffer
const fs = require('fs');
const net = require('net');
const plugin = exports;
// Defaults
let port = 9001;
exports.register = function () {
const cfg = this.config.get('messagesniffer.ini');
if (cfg.main.port) port = parseInt(cfg.main.port);
}
exports.hook_connect = function (next, connection) {
const cfg = this.config.get('messagesniffer.ini');
// Skip any private IP ranges
// Skip connection.transaction undefined
if (connection?.remote?.is_private || !connection?.transaction) return next();
// Retrieve GBUdb information for the connecting IP
SNFClient(`<snf><xci><gbudb><test ip='${connection.remote.ip}'/></gbudb></xci></snf>`, (err, result) => {
if (err) {
connection.logerror(this, err.message);
return next();
}
let match;
if ((match = /<result ((?:(?!\/>)[^])+)\/>/.exec(result))) {
// Log result
connection.loginfo(this, match[1]);
// Populate result
const gbudb = {};
const split = match[1].toString().split(/\s+/);
for (const element of split) {
const split2 = element.split(/=/);
gbudb[split2[0]] = split2[1].replace(/(?:^'|'$)/g,'');
}
// Set notes for other plugins
connection.notes.gbudb = gbudb;
// Handle result
switch (gbudb.range) {
case 'new':
case 'normal':
return next();
case 'white':
// Default for white if no configuration
if (!cfg.gbudb || (cfg.gbudb && !cfg.gbudb[gbudb.range])) {
return next(OK);
}
// fall through
case 'caution':
case 'black':
case 'truncate':
if (cfg.gbudb?.[gbudb.range]) {
connection.loginfo(this, `range=${gbudb.range} action=${cfg.gbudb[gbudb.range]}`);
switch (cfg.gbudb[gbudb.range]) {
case 'accept':
// Whitelist
connection.notes.gbudb.action = 'accept';
return next(OK);
case 'allow':
case 'continue':
// Continue to next plugin
connection.notes.gbudb.action = 'allow';
return next();
case 'retry':
case 'tempfail':
return next(DENYSOFT, `Poor GBUdb reputation for [${connection.remote.ip}]`);
case 'reject':
return next(DENY, `Poor GBUdb reputation for [${connection.remote.ip}]`);
case 'quarantine':
connection.notes.gbudb.action = 'quarantine';
connection.notes.quarantine = true;
connection.notes.quarantine_action = [ OK, `Message quarantined (${connection.transaction.uuid})` ];
break;
case 'tag':
connection.notes.gbudb.action = 'tag';
break;
default:
// Unknown action
return next();
}
}
else if (gbudb.range === 'truncate') {
// Default for truncate
return next(DENY, `Poor GBUdb reputation for [${connection.remote.ip}]`);
}
return next();
default:
// Unknown
connection.logerror(this, `Unknown GBUdb range: ${gbudb.range}`);
next();
}
}
else {
next();
}
});
}
exports.hook_data_post = function (next, connection) {
const cfg = this.config.get('messagesniffer.ini');
const txn = connection?.transaction;
if (!txn) return next();
function tag_subject (){
const tag = cfg.main.tag_string || '[SPAM]';
const subj = txn.header.get_decoded('Subject');
// Try and prevent any double subject modifications
const subject_re = new RegExp(`^${tag}`);
if (!subject_re.test(subj)) {
txn.remove_header('Subject');
txn.add_header('Subject', `${tag} ${subj}`);
}
// Add spam flag
txn.remove_header('X-Spam-Flag');
txn.add_header('X-Spam-Flag', 'YES');
}
// Check GBUdb results
if (connection.notes.gbudb?.action) {
switch (connection.notes.gbudb.action) {
case 'accept':
case 'quarantine':
return next(OK);
case 'tag':
// Tag message
tag_subject();
return next();
}
}
const tmpdir = cfg.main.tmpdir || '/tmp';
const tmpfile = `${tmpdir}/${txn.uuid}.tmp`;
const ws = fs.createWriteStream(tmpfile);
ws.once('error', err => {
connection.logerror(this, `Error writing temporary file: ${err.message}`);
next();
});
ws.once('close', () => {
const start_time = Date.now();
SNFClient(`<snf><xci><scanner><scan file='${tmpfile}' xhdr='yes'/></scanner></xci></snf>`, (err, result) => {
const end_time = Date.now();
const elapsed = end_time - start_time;
// Delete the tempfile
fs.unlink(tmpfile, () => {});
let match;
// Make sure we actually got a result
if ((match = /<result code='(\d+)'/.exec(result))) {
const code = parseInt(match[1]);
let group;
let rules;
let gbudb_ip;
// Make a note that we actually ran
connection.notes.snf_run = true;
// Get the returned headers
if ((match = /<xhdr>((?:(?!<\/xhdr>)[^])+)/.exec(result,'m'))) {
// Parse the returned headers and add them to the message
const xhdr = match[1].split('\r\n');
const headers = [];
for (const line of xhdr) {
// Check for continuation
if (/^\s/.test(line)) {
// Continuation; add to previous header value
if (headers[headers.length-1]) {
headers[headers.length-1].value += `${line}\r\n`;
}
}
else {
// Must be a header
match = /^([^: ]+):(?:\s*(.+))?$/.exec(line);
if (match) {
headers.push({ header: match[1], value: (match[2] ? `${match[2]}\r\n` : '\r\n') });
}
}
}
// Add headers to message
for (const header of headers) {
// If present save the group for logging purposes
if (header.header === 'X-MessageSniffer-SNF-Group') {
group = header.value.replace(/\r?\n/gm, '');
}
// Log GBUdb analysis
if (header.header === 'X-GBUdb-Analysis') {
// Retrieve IP address determined by GBUdb
const gbudb_split = header.value.split(/,\s*/);
gbudb_ip = gbudb_split[1];
connection.logdebug(this, `GBUdb: ${header.value.replace(/\r?\n/gm, '')}`);
}
if (header.header === 'X-MessageSniffer-Rules') {
rules = header.value.replace(/\r?\n/gm, '').replace(/\s+/g,' ').trim();
connection.logdebug(this, `rules: ${rules}`);
}
// Remove any existing headers
txn.remove_header(header.header);
txn.add_header(header.header, header.value);
}
}
// Summary log
connection.loginfo(this, `result: time=${elapsed}ms code=${code
}${gbudb_ip ? ` ip="${gbudb_ip}"` : ''
}${group ? ` group="${group}"` : ''
}${rules ? ` rule_count=${rules.split(/\s+/).length}` : ''
}${rules ? ` rules="${rules}"` : ''}`);
// Result code MUST in the 0-63 range otherwise we got an error
// http://www.armresearch.com/support/articles/software/snfServer/errors.jsp
if (code === 0 || (code && code <= 63)) {
// Handle result
let action;
if (cfg.message) {
if (code === 0 && cfg.message.white) {
action = cfg.message.white;
}
else if (code === 1) {
if (cfg.message.local_white) {
action = cfg.message.local_white;
}
else {
return next(OK);
}
}
else if (code === 20) {
if (cfg.message.truncate) {
action = cfg.message.truncate;
}
else {
return next(DENY, `Poor GBUdb reputation for IP [${connection.remote.ip}]`);
}
}
else if (code === 40 && cfg.message.caution) {
action = cfg.message.caution;
}
else if (code === 63 && cfg.message.black) {
action = cfg.message.black;
}
else {
if (cfg.message[`code_${code}`]) {
action = cfg.message[`code_${code}`];
}
else {
if (code > 1 && code !== 40) {
if (cfg.message.nonzero) {
action = cfg.message.nonzero;
}
else {
return next(DENY, `Spam detected by MessageSniffer (code=${code} group=${group})`);
}
}
}
}
}
else {
// Default with no configuration
if (code > 1 && code !== 40) {
return next(DENY, `Spam detected by MessageSniffer (code=${code} group=${group})`);
}
else {
return next();
}
}
switch (action) {
case 'accept':
// Whitelist
return next(OK);
case 'allow':
case 'continue':
// Continue to next plugin
return next();
case 'retry':
case 'tempfail':
return next(DENYSOFT, `Spam detected by MessageSniffer (code=${code} group=${group})`);
case 'reject':
return next(DENY, `Spam detected by MessageSniffer (code=${code} group=${group})`);
case 'quarantine':
// Set flag for queue/quarantine plugin
txn.notes.quarantine = true;
txn.notes.quarantine_action = [ OK, `Message quarantined (${txn.uuid})` ];
break;
case 'tag':
tag_subject();
// fall through
default:
return next();
}
}
else {
// Out-of-band code returned
// Handle Bulk/Noisy special rule by re-writing the Precedence header
if (code === 100) {
let precedence = txn.header.get('precedence');
if (precedence) {
// We already have a precedence header
precedence = precedence.trim().toLowerCase();
switch (precedence) {
case 'bulk':
case 'list':
case 'junk':
// Leave these as they are
break;
default:
// Remove anything else and replace it with 'bulk'
txn.remove_header('precedence');
txn.add_header('Precedence', 'bulk');
}
}
else {
txn.add_header('Precedence', 'bulk');
}
}
return next();
}
}
else {
// Something must have gone wrong
connection.logwarn(this, `unexpected response: ${result}`);
}
return next();
});
});
// TODO: we only need the first 64Kb of the message
txn.message_stream.pipe(ws, { line_endings: '\r\n' });
}
exports.hook_disconnect = function (next, connection) {
const cfg = this.config.get('messagesniffer.ini');
// Train GBUdb on rejected messages and recipients
if (cfg.main.gbudb_report_deny && !connection.notes.snf_run &&
(connection.rcpt_count.reject > 0 || connection.msg_count.reject > 0)) {
const snfreq = `<snf><xci><gbudb><bad ip='${connection.remote.ip}'/></gbudb></xci></snf>`;
SNFClient(snfreq, (err, result) => {
if (err) {
connection.logerror(this, err.message);
}
else {
connection.logdebug(this, `GBUdb bad encounter added for ${connection.remote.ip}`);
}
next();
});
}
else {
next();
}
}
function SNFClient (req, cb) {
let result;
const sock = new net.Socket();
sock.setTimeout(30 * 1000); // Connection timeout
sock.once('timeout', function () {
this.destroy();
cb(new Error('connection timed out'));
});
sock.once('error', err => cb(err));
sock.once('connect', function () {
// Connected, send request
plugin.logprotocol(`> ${req}`);
this.write(`${req}\n`);
});
sock.on('data', data => {
plugin.logprotocol(`< ${data}`);
// Buffer all the received lines
(result ? result += data : result = data);
});
sock.once('end', () => {
// Check for result
if (/<result /.exec(result)) return cb(null, result);
let match;
if ((match = /<error message='([^']+)'/.exec(result))) {
return cb(new Error(match[1]));
}
cb(new Error(`unexpected result: ${result}`));
});
// Start the sequence
sock.connect(port);
}