onebeyond/systemic-azure-bus

View on GitHub
lib/operations/topics/publish.js

Summary

Maintainability
A
2 hrs
Test Coverage
const zlib = require('zlib');

const encodingStrategies = {
    zlib: body => {
        const bodyStringified = JSON.stringify(body);
        return zlib.deflateSync(Buffer.from(bodyStringified));
    },
    default: body => body,
};

const getBodyEncoded = (body, contentEncoding) => (encodingStrategies[contentEncoding] || encodingStrategies.default)(body);

module.exports = sender => async (body, options = {}) => { // eslint-disable-line object-curly-newline
    const { label = '', contentEncoding, applicationProperties, scheduledEnqueueTimeUtc, ...messageFields } = options;
    const message = {
        label,
        subject: label,
        applicationProperties: {
            contentEncoding,
            attemptCount: 0,
            ...applicationProperties,
        },
        ...messageFields,
        body: getBodyEncoded(body, contentEncoding),
    };

    let isMessageSent = false;
    let attempts = 0;
    let lastError = null;
    let sequenceNumber;

    while (!isMessageSent && attempts < 3) {
        try {
            if (scheduledEnqueueTimeUtc) {
                // eslint-disable-next-line no-await-in-loop
                sequenceNumber = await sender.scheduleMessages([message], scheduledEnqueueTimeUtc);
            } else {
                // eslint-disable-next-line no-await-in-loop
                await sender.sendMessages(message);
            }
            isMessageSent = true;
        } catch (err) {
            lastError = err;
            attempts++;
        }
    }

    if (!isMessageSent && lastError) throw lastError;

    return Promise.resolve(sequenceNumber);
};