smtp_client.js
'use strict';
// SMTP client object and class. This allows every part of the client
// protocol to be hooked for different levels of control, such as
// smtp_forward and smtp_proxy queue plugins.
// It can use HostPool to get a connection to a pool of
// possible hosts in the configuration value "forwarding_host_pool", rather
// than a bunch of connections to a single host from the configuration values
// in "host" and "port" (see host_pool.js).
// node.js builtins
const events = require('events');
// npm deps
const ipaddr = require('ipaddr.js');
const net_utils = require('haraka-net-utils');
const utils = require('haraka-utils');
// haraka libs
const line_socket = require('./line_socket');
const logger = require('./logger');
const HostPool = require('./host_pool');
const smtp_regexp = /^(\d{3})([ -])(.*)/;
const STATE = {
IDLE: 1,
ACTIVE: 2,
RELEASED: 3,
DESTROYED: 4,
}
class SMTPClient extends events.EventEmitter {
constructor (opts = {}) {
super();
this.uuid = utils.uuid();
this.connect_timeout = parseInt(opts.connect_timeout) || 30;
this.socket = opts.socket || line_socket.connect(opts.port, opts.host);
this.socket.setTimeout(this.connect_timeout * 1000);
this.socket.setKeepAlive(true);
this.state = STATE.IDLE;
this.command = 'greeting';
this.response = [];
this.connected = false;
this.authenticating= false;
this.authenticated = false;
this.auth_capabilities = [];
this.host = opts.host;
this.port = opts.port;
this.smtputf8 = false;
const client = this;
client.socket.on('line', (line) => {
client.emit('server_protocol', line);
const matches = smtp_regexp.exec(line);
if (!matches) {
client.emit('error', `${client.uuid}: Unrecognized response from upstream server: ${line}`);
client.destroy();
return;
}
const code = matches[1];
const cont = matches[2];
const msg = matches[3];
client.response.push(msg);
if (cont !== ' ') return;
if (client.command === 'auth' || client.authenticating) {
logger.loginfo(`SERVER RESPONSE, CLIENT ${client.command}, authenticating=${client.authenticating},code=${code},cont=${cont},msg=${msg}`);
if (/^3/.test(code) && (
msg === 'VXNlcm5hbWU6' ||
msg === 'dXNlcm5hbWU6' // Workaround ill-mannered SMTP servers (namely smtp.163.com)
)) {
client.emit('auth_username');
return;
}
if (/^3/.test(code) && msg === 'UGFzc3dvcmQ6') {
client.emit('auth_password');
return;
}
if (/^2/.test(code) && client.authenticating) {
logger.loginfo('AUTHENTICATED');
client.authenticating = false;
client.authenticated = true;
client.emit('auth');
return;
}
}
if (client.command === 'ehlo') {
if (code.match(/^5/)) {
// Handle fallback to HELO if EHLO is rejected
client.emit('greeting', 'HELO');
return;
}
client.emit('capabilities');
if (client.command !== 'ehlo') {
return;
}
}
if (client.command === 'xclient' && /^5/.test(code)) {
// XCLIENT command was rejected (no permission?)
// Carry on without XCLIENT
client.command = 'helo';
}
else if (/^[45]/.test(code)) {
client.emit('bad_code', code, client.response.join(' '));
if (client.state !== STATE.ACTIVE) {
return;
}
}
if (/^441/.test(code)) {
if (/Connection timed out/i.test(msg)) {
client.destroy();
}
}
switch (client.command) {
case 'xclient':
client.xclient = true;
client.emit('xclient', 'EHLO');
break;
case 'starttls':
client.upgrade(client.tls_options);
break;
case 'greeting':
client.connected = true;
client.emit('greeting', 'EHLO');
break;
case 'ehlo':
client.emit('helo');
break;
case 'helo':
case 'mail':
case 'rcpt':
case 'data':
case 'dot':
case 'rset':
case 'auth':
client.emit(client.command);
break;
case 'quit':
client.emit('quit');
client.destroy();
break;
default:
throw new Error(`Unknown command: ${client.command}`);
}
});
client.socket.on('connect', () => {
// Replace connection timeout with idle timeout
client.socket.setTimeout((opts.idle_timeout || 300) * 1000);
if (!client.socket.remoteAddress) {
// "Value may be undefined if the socket is destroyed"
logger.logdebug('socket.remoteAddress undefined');
return;
}
client.remote_ip = ipaddr.process(client.socket.remoteAddress).toString();
})
function closed (msg) {
return error => {
if (!error) error = '';
// error is e.g. "Error: connect ECONNREFUSED"
const errMsg = `${client.uuid}: [${client.host}:${client.port}] SMTP connection ${msg} ${error}`;
/* eslint-disable no-fallthrough */
switch (client.state) {
case STATE.ACTIVE:
client.emit('error', errMsg);
case STATE.IDLE:
case STATE.RELEASED:
client.destroy();
break;
case STATE.DESTROYED:
if (msg === 'errored' || msg === 'timed out') {
client.emit('connection-error', errMsg);
}
break
default:
}
logger.logdebug(`[smtp_client] ${errMsg} (state=${client.state})`);
}
}
client.socket.on('error', closed('errored'));
client.socket.on('timeout', closed('timed out'));
client.socket.on('close', closed('closed'));
client.socket.on('end', closed('ended'));
}
load_tls_config (opts) {
const tls_options = { servername: this.host };
if (opts) {
Object.assign(tls_options, opts);
}
this.tls_options = tls_options;
}
send_command (command, data) {
const line = (command === 'dot') ? '.' : command + (data ? (` ${data}`) : '');
this.emit('client_protocol', line);
this.command = command.toLowerCase();
this.response = [];
this.socket.write(`${line}\r\n`);
}
start_data (data) {
this.response = [];
this.command = 'dot';
data.pipe(this.socket, { dot_stuffing: true, ending_dot: true, end: false });
}
release () {
if (this.state === STATE.DESTROYED) return;
logger.logdebug(`[smtp_client] ${this.uuid} releasing, state=${this.state}`);
[
'auth', 'bad_code', 'capabilities', 'client_protocol', 'connection-error',
'data', 'dot', 'error', 'greeting', 'helo',
'mail', 'rcpt', 'rset', 'server_protocol', 'xclient',
].forEach(l => {
this.removeAllListeners(l);
})
if (this.connected) this.send_command('QUIT');
this.destroy()
}
destroy () {
if (this.state === STATE.DESTROYED) return
this.state = STATE.DESTROYED;
this.socket.destroy();
}
upgrade (tls_options) {
this.socket.upgrade(tls_options, (verified, verifyError, cert, cipher) => {
logger.loginfo(`secured:${
(cipher) ? ` cipher=${cipher.name} version=${cipher.version}` : ''
} verified=${verified}${
(verifyError) ? ` error="${verifyError}"` : ''
}${(cert?.subject) ? ` cn="${cert.subject.CN}" organization="${cert.subject.O}"` : ''
}${(cert?.issuer) ? ` issuer="${cert.issuer.O}"` : ''
}${(cert?.valid_to) ? ` expires="${cert.valid_to}"` : ''
}${(cert?.fingerprint) ? ` fingerprint=${cert.fingerprint}` : ''}`);
});
}
is_dead_sender (plugin, connection) {
if (connection?.transaction) return false;
// This likely means the sender went away on us, cleanup.
connection.logwarn(plugin, "transaction went away, releasing smtp_client");
this.release();
return true;
}
}
exports.smtp_client = SMTPClient;
// Get a smtp_client for the given attributes.
// used only in testing
exports.get_client = (server, callback, opts = {}) => {
const smtp_client = new SMTPClient(opts)
logger.logdebug(`[smtp_client] uuid=${smtp_client.uuid} host=${opts.host} port=${opts.port} created`)
callback(smtp_client)
}
exports.onCapabilitiesOutbound = (smtp_client, secured, connection, config, on_secured) => {
for (const line in smtp_client.response) {
if (/^XCLIENT/.test(smtp_client.response[line])) {
if (!smtp_client.xclient) {
smtp_client.send_command('XCLIENT', `ADDR=${connection.remote.ip}`);
return;
}
}
if (/^SMTPUTF8/.test(smtp_client.response[line])) {
smtp_client.smtputf8 = true;
}
if (/^STARTTLS/.test(smtp_client.response[line]) && !secured) {
let hostBanned = false
let serverBanned = false
// Check if there are any banned TLS hosts
if (smtp_client.tls_options.no_tls_hosts) {
// If there are check if these hosts are in the blacklist
hostBanned = net_utils.ip_in_list(smtp_client.tls_config.no_tls_hosts, config.host);
serverBanned = net_utils.ip_in_list(smtp_client.tls_config.no_tls_hosts, smtp_client.remote_ip);
}
if (!hostBanned && !serverBanned && config.enable_tls) {
smtp_client.socket.on('secure', on_secured);
smtp_client.secured = false; // have to wait in forward plugin before we can do auth, even if capabilities are there on first EHLO
smtp_client.send_command('STARTTLS');
return;
}
}
let auth_matches = smtp_client.response[line].match(/^AUTH (.*)$/);
if (auth_matches) {
smtp_client.auth_capabilities = [];
auth_matches = auth_matches[1].split(' ');
for (const authMatch of auth_matches) {
smtp_client.auth_capabilities.push(authMatch.toLowerCase());
}
}
}
}
// Get a smtp_client for the given attributes and set up the common
// config and listeners for plugins. This is what smtp_proxy and
// smtp_forward have in common.
exports.get_client_plugin = (plugin, connection, c, callback) => {
// c = config
// Merge in authentication settings from smtp_forward/proxy.ini if present
// FIXME: config.auth could be changed when API isn't frozen
if (c.auth_type || c.auth_user || c.auth_pass) {
c.auth = {
type: c.auth_type,
user: c.auth_user,
pass: c.auth_pass
}
}
const hostport = get_hostport(connection, connection.server, c);
const smtp_client = new SMTPClient(hostport)
logger.loginfo(`[smtp_client] uuid=${smtp_client.uuid} host=${hostport.host} port=${hostport.port} created`);
connection.logdebug(plugin, `Got smtp_client: ${smtp_client.uuid}`);
let secured = false;
smtp_client.load_tls_config(plugin.tls_options);
smtp_client.call_next = function (retval, msg) {
if (this.next) {
const { next } = this;
delete this.next;
next(retval, msg);
}
}
smtp_client.on('client_protocol', (line) => {
connection.logprotocol(plugin, `C: ${line}`);
})
smtp_client.on('server_protocol', (line) => {
connection.logprotocol(plugin, `S: ${line}`);
})
function helo (command) {
if (smtp_client.xclient) {
smtp_client.send_command(command, connection.hello.host);
}
else {
smtp_client.send_command(command, connection.local.host);
}
}
smtp_client.on('greeting', helo);
smtp_client.on('xclient', helo);
function on_secured () {
if (secured) return;
secured = true;
smtp_client.secured = true;
smtp_client.emit('greeting', 'EHLO');
}
smtp_client.on('capabilities', () => {
exports.onCapabilitiesOutbound(smtp_client, secured, connection, c, on_secured);
});
smtp_client.on('helo', () => {
if (!c.auth || smtp_client.authenticated) {
if (smtp_client.is_dead_sender(plugin, connection)) return;
smtp_client.send_command('MAIL', `FROM:${connection.transaction.mail_from.format(!smtp_client.smtp_utf8)}`);
return;
}
if (c.auth.type === null || typeof (c.auth.type) === 'undefined') return; // Ignore blank
const auth_type = c.auth.type.toLowerCase();
if (!smtp_client.auth_capabilities.includes(auth_type)) {
throw new Error(`Auth type "${auth_type}" not supported by server (supports: ${smtp_client.auth_capabilities.join(',')})`);
}
switch (auth_type) {
case 'plain':
if (!c.auth.user || !c.auth.pass) {
throw new Error("Must include auth.user and auth.pass for PLAIN auth.");
}
logger.logdebug(`[smtp_client] uuid=${smtp_client.uuid} authenticating as "${c.auth.user}"`);
smtp_client.send_command('AUTH', `PLAIN ${utils.base64(`${c.auth.user}\0${c.auth.user}\0${c.auth.pass}`)}`);
break;
case 'cram-md5':
throw new Error("Not implemented");
default:
throw new Error(`Unknown AUTH type: ${auth_type}`);
}
});
smtp_client.on('auth', () => {
// if authentication has been handled by plugin(s)
if (smtp_client.authenticating) return;
if (smtp_client.is_dead_sender(plugin, connection)) return;
smtp_client.authenticated = true;
smtp_client.send_command('MAIL', `FROM:${connection.transaction.mail_from.format(!smtp_client.smtp_utf8)}`);
});
// these errors only get thrown when the connection is still active
smtp_client.on('error', (msg) => {
connection.logwarn(plugin, msg);
smtp_client.call_next();
});
// these are the errors thrown when the connection is dead
smtp_client.on('connection-error', (error) => {
// error contains e.g. "Error: connect ECONNREFUSE"
logger.logerror(`backend failure: ${smtp_client.host}:${smtp_client.port} - ${error}`);
const { host_pool } = connection.server.notes;
// only exists for if forwarding_host_pool is set in the config
if (host_pool) {
host_pool.failed(smtp_client.host, smtp_client.port);
}
smtp_client.call_next();
});
if (smtp_client.connected) {
if (smtp_client.xclient) {
smtp_client.send_command('XCLIENT', `ADDR=${connection.remote.ip}`);
}
else {
smtp_client.emit('helo');
}
}
callback(null, smtp_client);
}
function get_hostport (connection, server, cfg) {
if (cfg.forwarding_host_pool) {
if (! server.notes.host_pool) {
connection.logwarn(`creating host_pool from ${cfg.forwarding_host_pool}`);
server.notes.host_pool =
new HostPool(
cfg.forwarding_host_pool, // 1.2.3.4:420, 5.6.7.8:420
cfg.dead_forwarding_host_retry_secs
);
}
const host = server.notes.host_pool.get_host();
if (host) return host; // { host: 1.2.3.4, port: 567 }
logger.logerror('[smtp_client] no backend hosts in pool!');
throw new Error("no backend hosts found in pool!");
}
if (cfg.host && cfg.port) return { host: cfg.host, port: cfg.port };
logger.logwarn("[smtp_client] forwarding_host_pool or host and port were not found in config file");
throw new Error("You must specify either forwarding_host_pool or host and port");
}