onebeyond/systemic-azure-bus

View on GitHub
index.js

Summary

Maintainability
F
4 days
Test Coverage
/* eslint-disable no-await-in-loop */
const debug = require('debug')('systemic-azure-bus');
const zlib = require('zlib');
const { join } = require('path');
const requireAll = require('require-all');
const { ServiceBusClient, ServiceBusAdministrationClient } = require('@azure/service-bus');

const errorStrategies = requireAll(join(__dirname, 'lib', 'errorStrategies'));
const factories = requireAll(join(__dirname, 'lib', 'clientFactories'));
const topicApi = requireAll(join(__dirname, 'lib', 'operations', 'topics'));

const decodingStrategies = {
    zlib: body => JSON.parse(zlib.inflateSync(body)),
    default: body => body,
};

const getBodyDecoded = (body, contentEncoding) => (decodingStrategies[contentEncoding] || decodingStrategies.default)(body);

module.exports = () => {
    let sbClient;
    let topicClientFactory;
    let queueClientFactory;
    let enqueuedItems = 0;
    let sendersByPublication = {};
    const openSubscriptions = [];

    const start = async ({
        config: {
            connection: { connectionString },
            subscriptions,
            publications,
        },
    }) => {
        sbClient = new ServiceBusClient(connectionString);
        topicClientFactory = factories.topics(sbClient);
        queueClientFactory = factories.queue(sbClient);

        const upsertSender = publicationId => {
            const { topic } = publications[publicationId] || {};
            if (!topic) {
                throw new Error(`Topic for publication ${publicationId} non found!`);
            }

            let sender = sendersByPublication[publicationId];
            if (!sender) {
                sender = topicClientFactory.createSender(topic);
                sendersByPublication[publicationId] = sender;
            }

            return sender;
        };

        const publish = publicationId => {
            const sender = upsertSender(publicationId);
            return topicApi.publish(sender);
        };

        const cancelScheduledMessages = async (publicationId, sequenceNumber) => {
            debug(`Cancelling messages ${sequenceNumber} for topic ${publicationId}`);
            const sender = upsertSender(publicationId);
            return sender.cancelScheduledMessages(sequenceNumber);
        };

        const getMessageProperties = brokeredMessage => {
            const {
                // ServiceBusReceivedMessage
                deadLetterReason,
                deadLetterErrorDescription,
                lockToken,
                deliveryCount,
                enqueuedTimeUtc,
                expiresAtUtc,
                lockedUntilUtc,
                enqueuedSequenceNumber,
                sequenceNumber,
                deadLetterSource,
                state,

                // ServiceBusMessage
                messageId,
                contentType,
                correlationId,
                partitionKey,
                sessionId,
                replyToSessionId,
                timeToLive,
                subject,
                to,
                replyTo,
                scheduledEnqueueTimeUtc,
            } = brokeredMessage;

            return {
                // ServiceBusReceivedMessage
                deadLetterReason,
                deadLetterErrorDescription,
                lockToken,
                deliveryCount,
                enqueuedTimeUtc,
                expiresAtUtc,
                lockedUntilUtc,
                enqueuedSequenceNumber,
                sequenceNumber,
                deadLetterSource,
                state,

                // ServiceBusMessage
                messageId,
                contentType,
                correlationId,
                partitionKey,
                sessionId,
                replyToSessionId,
                timeToLive,
                subject,
                to,
                replyTo,
                scheduledEnqueueTimeUtc,
            };
        };

        const subscribe = onError => (subscriptionId, handler) => {
            const { topic, subscription, errorHandling } = subscriptions[subscriptionId] || {};
            if (!topic || !subscription) throw new Error(`Data for subscription ${subscriptionId} non found!`);
            const receiver = topicClientFactory.createReceiver({ topic, subscription });
            const topicErrorStrategies = {
                retry: errorStrategies.retry(topic, receiver),
                deadLetter: errorStrategies.deadLetter(topic, receiver),
                exponentialBackoff: errorStrategies.exponentialBackoff(topic, topicClientFactory, receiver),
            };

            const onMessageHandler = async brokeredMessage => {
                try {
                    enqueuedItems++;
                    debug(`Enqueued items increase | ${enqueuedItems} items`);
                    debug(`Handling message on topic ${topic}`);
                    const { body, applicationProperties } = brokeredMessage;
                    const { subscriptionName: messageSubscription } = applicationProperties;

                    if (!messageSubscription || subscription === messageSubscription) {
                        /**
                         * The handler is only going to run if the "messageSubscription" property
                         * does not exists. Or if it exists and is the current subscription from all
                         * the different ones that the topic can contain.
                         * But the message confirmation operation will always be done, even if the handler
                         * is not executed because of the comment above.
                         */
                        await handler({
                            body: getBodyDecoded(
                                body,
                                applicationProperties.contentEncoding,
                            ),
                            applicationProperties,
                            properties: getMessageProperties(brokeredMessage),
                        });
                    }
                    await receiver.completeMessage(brokeredMessage);
                } catch (e) {
                    const subscriptionErrorStrategy = (errorHandling || {}).strategy;
                    const errorStrategy = e.strategy || subscriptionErrorStrategy || 'retry';
                    debug(`Handling error with strategy ${errorStrategy} on topic ${topic}`);
                    const errorHandler = topicErrorStrategies[errorStrategy] || topicErrorStrategies.retry;
                    await errorHandler(brokeredMessage, errorHandling || {});
                } finally {
                    enqueuedItems--;
                    debug(`Enqueued items decrease | ${enqueuedItems} items`);
                }
            };

            debug(`Starting subscription ${subscriptionId} on topic ${topic}...`);
            const openSubscription = receiver.subscribe({
                processMessage: onMessageHandler,
                processError: async args => {
                    onError(args.error);
                },
            }, { autoCompleteMessages: false });
            openSubscriptions.push(openSubscription);
        };

        const peekDlq = async (subscriptionId, messagesNumber = 1) => {
            const { topic, subscription } = subscriptions[subscriptionId] || {};
            if (!topic || !subscription) throw new Error(`Data for subscription ${subscriptionId} non found!`);
            // check access to dlq by topic and client
            const dlqReceiver = topicClientFactory.createReceiver({ topic, subscription, isDlq: true });

            const peekedMessages = await dlqReceiver.receiveMessages(messagesNumber, { maxWaitTimeInMs: 3000 });
            debug(`${peekedMessages.length} peeked messages from DLQ ${dlqReceiver.entityPath}`);
            await dlqReceiver.close();
            return peekedMessages;
        };

        const peek = async (subscriptionId, messagesNumber = 1) => {
            const { topic, subscription } = subscriptions[subscriptionId] || {};
            if (!topic || !subscription) throw new Error(`Data for subscription ${subscriptionId} non found!`);
            const topicReceiver = topicClientFactory.createReceiver({ topic, subscription });
            const activeMessages = await topicReceiver.peekMessages(messagesNumber);
            debug(`${activeMessages.length} peeked messages from Active Queue`);
            await topicReceiver.close();
            return activeMessages;
        };

        const processDlq = async (subscriptionId, handler) => {
            const { topic, subscription } = subscriptions[subscriptionId] || {};
            if (!topic || !subscription) throw new Error(`Data for subscription ${subscriptionId} non found!`);

            const dlqReceiver = topicClientFactory.createReceiver({ topic, subscription, isDlq: true });

            while ((messages = await dlqReceiver.receiveMessages(1, { maxWaitTimeInMs: 3000 })) && messages.length > 0) { // eslint-disable-line no-undef, no-cond-assign, no-await-in-loop
                debug('Processing message from DLQ');
                await handler(messages[0], dlqReceiver); // eslint-disable-line no-undef, no-await-in-loop
            }
            await dlqReceiver.close();
        };

        const emptyDlq = async subscriptionId => {
            const { topic, subscription } = subscriptions[subscriptionId] || {};
            if (!topic || !subscription) throw new Error(`Data for subscription ${subscriptionId} non found!`);

            try {
                const dlqReceiver = topicClientFactory.createReceiver({ topic, subscription, mode: 'receiveAndDelete', isDlq: true });

                let messagesPending = true;
                const getMessagesFromDlq = async () => {
                    const messages = await dlqReceiver.receiveMessages(50, { maxWaitTimeInMs: 3000 });
                    if (messages.length === 0) {
                        debug('There are no messages in this Dead Letter Queue');
                        messagesPending = false;
                    } else if (messages.length < 50) {
                        debug(`processing last ${messages.length} messages from DLQ`);
                        messagesPending = false;
                    } else {
                        debug('processing last 50 messages from DLQ');
                    }
                };
                while (messagesPending) {
                    await getMessagesFromDlq();
                }
                await dlqReceiver.close();
            } catch (err) {
                debug(`Error while deleting dead letter queue: ${err.message}`);
                throw (err);
            }
        };

        const getSubscriptionRules = async subscriptionId => {
            const { topic, subscription } = subscriptions[subscriptionId] || {};
            if (!topic || !subscription) throw new Error(`Data for subscription ${subscriptionId} non found!`);
            const adminClient = new ServiceBusAdministrationClient(connectionString);
            const rules = await adminClient.getRules();
            return rules;
        };

        const health = async () => {
            const subscriptionNames = Object.keys(subscriptions);
            const getConfigTopic = name => subscriptions[name].topic;
            const getConfigSubscription = name => subscriptions[name].subscription;
            const createClient = name => topicClientFactory.createReceiver({ topic: getConfigTopic(name), subscription: getConfigSubscription(name) });
            let healthCheck;
            try {
                const clients = subscriptionNames.map(createClient);
                const healthchecks = clients.map(c => c.peekMessages(1));
                await Promise.all(healthchecks);
                await Promise.all(clients.map(c => c.close()));
                healthCheck = {
                    status: 'ok',
                };
            } catch (err) {
                healthCheck = {
                    status: 'error',
                    details: err.message,
                };
            }
            return healthCheck;
        };

        return {
            health,
            publish,
            cancelScheduledMessages,
            subscribe,
            peekDlq,
            peek,
            processDlq,
            emptyDlq,
            getSubscriptionRules,
        };
    };

    const drainOpenSubscriptions = async () => {
        debug(`Closing ${openSubscriptions.length} active subscriptions`);
        const closeSubscription = async (subscription, index) => {
            debug('Closing active subscription', index);
            await subscription.close();
        };
        await Promise.all(openSubscriptions.map(closeSubscription));
    };

    const stop = async () => {
        debug('Closing open stream subscriptions and draining...');
        await drainOpenSubscriptions();

        let timer;
        // eslint-disable-next-line no-return-assign
        const checkIfSubscriptionIsEmpty = () => new Promise(resolve => {
            timer = setInterval(async () => {
                debug(`Trying to stop component | ${enqueuedItems} enqueued items remaining`);
                // eslint-disable-next-line no-unused-expressions
                enqueuedItems === 0 && resolve();
            }, 100);
        });
        await checkIfSubscriptionIsEmpty();
        clearInterval(timer);

        sendersByPublication = {};

        debug('Closing topic client factory...');
        await topicClientFactory.stop();
        debug('Closing queue client factory...');
        await queueClientFactory.stop();
        debug('Closing ServiceBusClient connection...');
        await sbClient.close();
    };

    return { start, stop };
};