var amqp = require('amqplib/callback_api');
var debug = require('debug')('pub-sub-amqp');
var constants = require('./constants.json');
var Event = require('./event');
/**
* A wrapper over AMQP, providing methods to publish and subscribe.
* @constructor
* @author tbking <tarun.batra00@gmail.com>
* @param {object} options - Options passed to the AMQP client
* @param {string} options.uri - URI of the AMQP broker to connect
* @param {string} [options.exchange=exchange] - Exchange name
* @param {string} [options.exchangeType=topic] - Exchange type
* @param {string} [options.queue=$pid] - Queue name
* @param {string} [options.durable=false] - Queue durability
* @param {function} [callback] - Callback
*/
function AMQPClient(options, callback) {
this.options = {
uri: options.uri, // URI of the AMQP broker to connect
exchange: options.exchange || 'exchange', // Exchange name. Defaults to 'exchange'
exchangeType: options.exchangeType || 'topic', // Exchange type. Defaults to 'topic'
queue: options.queue || String(process.pid), // Queue name. Defaults to process id
durable: options.durable || false // Queue durability. Defaults to false
};
var self = this;
debug(constants.CONNECTING);
// Connect to AMQP broker
amqp.connect(self.options.uri, function (err, connection) {
if (err) {
debug(err);
if (callback) {
return callback(err);
}
throw err;
}
debug(constants.CONNECTED);
// Create a channel
connection.createChannel(function (error, channel) {
if (error) {
debug(error);
if (callback) {
return callback(error);
}
throw error;
}
// Saves reference of channel for future use
self.channel = channel;
// Create an exchange if it doesn't exists
channel.assertExchange(self.options.exchange, self.options.exchangeType);
// Create a queue if it doesn't exists
channel.assertQueue(self.options.queue, { durable: self.options.durable });
debug(constants.INITIALIZED);
if (callback) {
return callback(null, self);
}
});
});
}
/**
* Publishes message for any client interested to subscribe and listen to.
* @param {string} type - Type of the message
* @param {*} data - Data to be sent
* @param {function} [callback] - Callback
*/
AMQPClient.prototype.emit = function (type, data, callback) {
var self = this;
// If channnel not initialized, throw error
if (!self.channel) {
var error = new Error(constants.NOT_CONNECTED);
debug(error);
if (callback) {
return callback(error);
}
throw error;
}
// Generate message to be published
var msg = JSON.stringify(data);
// Publish the message
self.channel.publish(self.options.exchange, type, new Buffer(msg));
debug('PUBLISHED: [%s] %o', type, data);
if (callback) {
return callback();
}
};
/**
* Registers a listener for messages of a particular type.
* @param {string} type - Type of the message
* @param {messageReceivedCallback} callback - Callback
*/
AMQPClient.prototype.on = function (type, callback) {
var self = this;
// If channnel not initialized, return error
if (!self.channel) {
var notConnectedError = new Error(constants.NOT_CONNECTED);
debug(notConnectedError);
if (callback) {
return callback(notConnectedError);
}
// If no callback, throw the error
throw notConnectedError;
}
// Bind the queue qith the exchange
self.channel.bindQueue(self.options.queue, self.options.exchange, type);
debug('LISTENING: %s', type);
// Listen for messages
self.channel.consume(self.options.queue, function (msg) {
// Create the event
var event = new Event(self, msg);
debug('RECEVIED: [%s] %o', type, event.data);
// Return the event
if (callback) {
return callback(null, event);
}
});
};
module.exports = AMQPClient;
/**
* Callback called when a subscribed message is received
* @callback messageReceivedCallback
* @param {object|null} error - Error
* @param {Event} event - Event object containing message details
* @param {object} event.data - Data received in the message
* @param {object} event.ack - Method to acknowledge the message
* @param {object} event.reject - Method to reject the message
*/