21stio/nodejs-queue-adapter

View on GitHub
typescript/adapter/activemq/ActiveMqAdapter.ts

Summary

Maintainability
C
1 day
Test Coverage
import {IEncoder} from "../../encoder/IEncoder";
import {ActiveMqConfig} from "./ActiveMqConfig";
import Promise = require('bluebird');
import Amqp10 = require('amqp10');
import {IJob} from "../abstract/IJob";
import {ActiveMqJob} from "./ActiveMqJob";
import {QueueAdapter} from "../abstract/QueueAdapter";
import {IErrorHandler} from "../../handler/error/IErrorHandler";

export class ActiveMqAdapter extends QueueAdapter {

    protected config:ActiveMqConfig;
    protected clientPromises:{[concurrency: number]: Promise} = {};
    protected receiverPromises:{[queueName: string]: Promise} = {};
    protected senderPromises:{[queueName: string]: Promise} = {};

    constructor(errorHandler:IErrorHandler, encoder:IEncoder, config:ActiveMqConfig = new ActiveMqConfig()) {
        super(errorHandler, encoder, config);
    }

    public consume(queueName:string, callback:(job:IJob) => void) {
        var self = this;

        self.getReceiverPromise(queueName).then(function (receiver) {
            receiver.on('message', function (message) {
                var job = new ActiveMqJob(self.errorHandler, message.body, receiver, message);

                callback(job);
            })
        })
    }

    public produce(queueName:string, message:any) {
        var self = this;

        return self.getSenderPromise(queueName).then(function (sender) {
            return sender.send(message);
        })
    }

    protected getClientPromise(concurrency:number) {
        var self = this;

        if (!self.clientPromises[concurrency]) {
            var client = new Amqp10.Client(Amqp10.Policy.Utils.RenewOnSettle(concurrency, concurrency, Amqp10.Policy.ServiceBusQueue));
            self.clientPromises[concurrency] = new Promise(function (resolve, reject) {
                client.connect(self.getConnectionString(self.config))
                    .then(function () {
                        resolve(client)
                    })
                    .error(reject);
            })
        }

        return self.clientPromises[concurrency];
    }

    protected getSenderPromise(queueName:string):Promise {
        var self = this;
        var concurrency = self.config.getConcurrency(queueName);

        if (!self.senderPromises[queueName]) {
            self.senderPromises[queueName] = self.getClientPromise(concurrency).then(function (client) {
                return client.createSender(queueName);
            })
        }

        return self.senderPromises[queueName];
    }

    protected getReceiverPromise(queueName:string):Promise {
        var self = this;
        var concurrency = self.config.getConcurrency(queueName);

        if (!self.receiverPromises[queueName]) {
            self.receiverPromises[queueName] = self.getClientPromise(concurrency).then(function (client) {
                return client.createReceiver(queueName);
            })
        }

        return self.receiverPromises[queueName];
    }

    protected getConnectionString(config:ActiveMqConfig) {
        var connectionString = 'amqp://';

        if (config.username && config.password) {
            connectionString += config.username + ':' + config.password + '@';
        }

        connectionString += config.host;

        if (config.port) {
            connectionString += ':' + config.port;
        }

        return connectionString;
    }

}