msimerson/Haraka

View on GitHub
outbound/index.js

Summary

Maintainability
D
1 day
Test Coverage
'use strict';

const fs          = require('fs');
const path        = require('path');

const async       = require('async');
const { Address }     = require('address-rfc2821');
const config      = require('haraka-config');
const constants   = require('haraka-constants');
const net_utils   = require('haraka-net-utils');
const utils       = require('haraka-utils');
const ResultStore = require('haraka-results');

const logger      = require('../logger');
const trans       = require('../transaction');
const plugins     = require('../plugins');
const FsyncWriteStream = require('./fsync_writestream');

const obc         = require('./config');
const queuelib    = require('./queue');
const HMailItem   = require('./hmail');
const TODOItem    = require('./todo');
const _qfile = exports.qfile = require('./qfile');

const { queue_dir, temp_fail_queue, delivery_queue } = queuelib;

exports.temp_fail_queue = temp_fail_queue;
exports.delivery_queue = delivery_queue;

exports.net_utils = net_utils;
exports.config = config;

exports.get_stats = queuelib.get_stats;
exports.list_queue = queuelib.list_queue;
exports.stat_queue = queuelib.stat_queue;
exports.scan_queue_pids = queuelib.scan_queue_pids;
exports.flush_queue = queuelib.flush_queue;
exports.load_pid_queue = queuelib.load_pid_queue;
exports.ensure_queue_dir = queuelib.ensure_queue_dir;
exports.load_queue = queuelib.load_queue;
exports.stats = queuelib.stats;

process.on('message', msg => {
    if (!msg.event) return

    if (msg.event === 'outbound.load_pid_queue') {
        exports.load_pid_queue(msg.data);
        return;
    }
    if (msg.event === 'outbound.flush_queue') {
        exports.flush_queue(msg.domain, process.pid);
        return;
    }
    if (msg.event === 'outbound.shutdown') {
        logger.loginfo("[outbound] Shutting down temp fail queue");
        temp_fail_queue.shutdown();
        return;
    }
    // ignores the message
});

exports.send_email = function () {

    if (arguments.length === 2) {
        logger.logdebug("[outbound] Sending email as a transaction");
        return this.send_trans_email(arguments[0], arguments[1]);
    }

    let from = arguments[0];
    let to   = arguments[1];
    let contents = arguments[2];
    const next = arguments[3];
    const options = arguments[4] || {};

    const dot_stuffed = options.dot_stuffed ? options.dot_stuffed : false;
    const notes = options.notes ? options.notes : null;
    const origin = options.origin ? options.origin : null;

    logger.loginfo("[outbound] Sending email via params", origin);

    const transaction = trans.createTransaction();

    logger.loginfo(`[outbound] Created transaction: ${transaction.uuid}`, origin);

    //Adding notes passed as parameter
    if (notes) {
        transaction.notes = notes;
    }

    // set MAIL FROM address, and parse if it's not an Address object
    if (from instanceof Address) {
        transaction.mail_from = from;
    }
    else {
        try {
            from = new Address(from);
        }
        catch (err) {
            return next(constants.deny, `Malformed from: ${err}`);
        }
        transaction.mail_from = from;
    }

    // Make sure to is an array
    if (!(Array.isArray(to))) {
        // turn into an array
        to = [ to ];
    }

    if (to.length === 0) {
        return next(constants.deny, "No recipients for email");
    }

    // Set RCPT TO's, and parse each if it's not an Address object.
    for (let i=0,l=to.length; i < l; i++) {
        if (!(to[i] instanceof Address)) {
            try {
                to[i] = new Address(to[i]);
            }
            catch (err) {
                return next(constants.deny,
                    `Malformed to address (${to[i]}): ${err}`);
            }
        }
    }

    transaction.rcpt_to = to;

    // Set data_lines to lines in contents
    if (typeof contents == 'string') {
        let match;
        while ((match = utils.line_regexp.exec(contents))) {
            let line = match[1];
            line = line.replace(/\r?\n?$/, '\r\n'); // make sure it ends in \r\n
            if (dot_stuffed === false && line.length >= 3 && line.substr(0,1) === '.') {
                line = `.${line}`;
            }
            transaction.add_data(Buffer.from(line));
            contents = contents.substr(match[1].length);
            if (contents.length === 0) {
                break;
            }
        }
    }
    else {
        // Assume a stream
        return stream_line_reader(contents, transaction, err => {
            if (err) {
                return next(constants.denysoft, `Error from stream line reader: ${err}`);
            }
            exports.send_trans_email(transaction, next);
        });
    }

    transaction.message_stream.add_line_end();

    // Allow for the removal of Message-Id and/or Date headers which
    // is useful when resending mail from a quarantine.
    if (options.remove_msgid) {
        transaction.remove_header('Message-Id');
    }
    if (options.remove_date) {
        transaction.remove_header('Date');
    }

    this.send_trans_email(transaction, next);
}

function stream_line_reader (stream, transaction, cb) {
    let current_data = '';
    function process_data (data) {
        current_data += data.toString();
        let results;
        while ((results = utils.line_regexp.exec(current_data))) {
            const this_line = results[1];
            current_data = current_data.slice(this_line.length);
            if (!(current_data.length || this_line.length)) {
                return;
            }
            transaction.add_data(Buffer.from(this_line));
        }
    }

    function process_end () {
        if (current_data.length) {
            transaction.add_data(Buffer.from(current_data));
        }
        current_data = '';
        transaction.message_stream.add_line_end();
        cb();
    }

    stream.on('data', process_data);
    stream.once('end', process_end);
    stream.once('error', cb);
}

function get_deliveries (transaction) {
    const deliveries = [];

    if (obc.cfg.always_split) {
        logger.logdebug({name: "outbound"}, "always split");
        transaction.rcpt_to.forEach(rcpt => {
            deliveries.push({domain: rcpt.host, rcpts: [ rcpt ]});
        });
        return deliveries;
    }

    // First get each domain
    const recips = {};
    transaction.rcpt_to.forEach(rcpt => {
        const domain = rcpt.host;
        if (!recips[domain]) { recips[domain] = []; }
        recips[domain].push(rcpt);
    });
    Object.keys(recips).forEach(domain => {
        deliveries.push({domain, 'rcpts': recips[domain]});
    });
    return deliveries;
}

exports.send_trans_email = function (transaction, next) {

    // add potentially missing headers
    if (!transaction.header.get_all('Message-Id').length) {
        logger.loginfo("[outbound] Adding missing Message-Id header");
        transaction.add_header('Message-Id', `<${transaction.uuid}@${net_utils.get_primary_host_name()}>`);
    }
    if (!transaction.header.get_all('Date').length) {
        logger.loginfo("[outbound] Adding missing Date header");
        transaction.add_header('Date', utils.date_to_str(new Date()));
    }

    if (obc.cfg.received_header !== 'disabled') {
        transaction.add_leading_header('Received', `(${obc.cfg.received_header}); ${utils.date_to_str(new Date())}`);
    }

    const connection = { transaction };

    logger.add_log_methods(connection);
    if (!transaction.results) {
        logger.logdebug('adding results store');
        transaction.results = new ResultStore(connection);
    }

    connection.pre_send_trans_email_respond = retval => {
        const deliveries = get_deliveries(transaction);
        const hmails = [];
        const ok_paths = [];

        let todo_index = 1;

        async.forEachSeries(deliveries, (deliv, cb) => {
            const todo = new TODOItem(deliv.domain, deliv.rcpts, transaction);
            todo.uuid = `${todo.uuid}.${todo_index}`;
            todo_index++;
            this.process_delivery(ok_paths, todo, hmails, cb);
        },
        (err) => {
            if (err) {
                for (let i=0, l=ok_paths.length; i<l; i++) {
                    fs.unlink(ok_paths[i], () => {});
                }
                transaction.results.add({ name: 'outbound'}, { err });
                if (next) next(constants.denysoft, err);
                return;
            }

            for (const hmail of hmails) {
                delivery_queue.push(hmail);
            }

            transaction.results.add({ name: 'outbound'}, { pass: "queued" });
            if (next) {
                next(constants.ok, `Message Queued (${transaction.uuid})`);
            }
        });
    }

    plugins.run_hooks('pre_send_trans_email', connection);
}

exports.process_delivery = function (ok_paths, todo, hmails, cb) {
    logger.loginfo(`[outbound] Transaction delivery for domain: ${todo.domain}`);
    const fname = _qfile.name();
    const tmp_path = path.join(queue_dir, `${_qfile.platformDOT}${fname}`);
    const ws = new FsyncWriteStream(tmp_path, { flags: constants.WRITE_EXCL });

    ws.on('close', () => {
        const dest_path = path.join(queue_dir, fname);
        fs.rename(tmp_path, dest_path, err => {
            if (err) {
                logger.logerror(`[outbound] Unable to rename tmp file!: ${err}`);
                fs.unlink(tmp_path, () => {});
                cb("Queue error");
            }
            else {
                hmails.push(new HMailItem (fname, dest_path, todo.notes));
                ok_paths.push(dest_path);
                cb();
            }
        })
    })

    ws.on('error', err => {
        logger.logerror(`[outbound] Unable to write queue file (${fname}): ${err}`);
        ws.destroy();
        fs.unlink(tmp_path, () => {});
        cb("Queueing failed");
    })

    this.build_todo(todo, ws, () => {
        todo.message_stream.pipe(ws, { line_endings: '\r\n', dot_stuffing: true, ending_dot: false });
    });
}

exports.build_todo = (todo, ws, write_more) => {

    const todo_str = `\n${JSON.stringify(todo, exclude_from_json, '\t')}\n`
    const todo_len = Buffer.byteLength(todo_str)

    const buf = Buffer.alloc(4 + todo_len);
    buf.writeUInt32BE(todo_len, 0);
    buf.write(todo_str, 4);

    const continue_writing = ws.write(buf);
    if (continue_writing) {
        process.nextTick(write_more);
        return
    }

    ws.once('drain', write_more);
}

// Replacer function to exclude items from the queue file header
function exclude_from_json (key, value) {
    switch (key) {
        case 'message_stream':
            return undefined;
        default:
            return value;
    }
}

// exported for testability
exports.TODOItem = TODOItem;

exports.HMailItem = HMailItem;

exports.lookup_mx = require('./mx_lookup').lookup_mx;