admin管理员组文章数量:1343369
I have been looking everywhere for a headers exchange
example using RabbitMQ in Node.js. If someone could point me in the right direction, that would be great. Here's what I have so far:
publisher method (create a publisher)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
publish method
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
consumer method (create a consumer)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
The above example works fine with topics
, but I'm not sure how to make the transition to headers
. I am pretty sure I need to change my binding approach, but haven't been able to find any examples on how exactly to acplish this.
Any help would be greatly appreciated!
I have been looking everywhere for a headers exchange
example using RabbitMQ in Node.js. If someone could point me in the right direction, that would be great. Here's what I have so far:
publisher method (create a publisher)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
publish method
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
consumer method (create a consumer)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
The above example works fine with topics
, but I'm not sure how to make the transition to headers
. I am pretty sure I need to change my binding approach, but haven't been able to find any examples on how exactly to acplish this.
Any help would be greatly appreciated!
Share Improve this question asked Dec 10, 2015 at 18:35 user1828780user1828780 5471 gold badge8 silver badges23 bronze badges1 Answer
Reset to default 11I stumbled across this question looking for the same answers for amqplib. Unfortunately, like you I found all available documentation lacking. After looking over the source, reading over the protocol a bit, and trying out a few binations, this finally did it for me.
...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...
...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...
The full working code is below. The auth info below is faked, so you'll have to use your own. I'm also using ES6, nodejs version 6.5, and amqplib. There may be an issue with giving your headers x-
prefixes and/or using reserved words as header names, but I'm not too sure (I'd have to see the RabbitMQ source).
emit.js:
#!/usr/bin/env node
const XCHANGE = 'headers-exchange';
const Q = require('q');
const Broker = require('amqplib');
let scope = 'anonymous';
process.on('uncaughtException', (exception) => {
console.error(`"::ERROR:: Uncaught exception ${exception}`);
});
process.argv.slice(2).forEach((arg) =>
{
scope = arg;
console.info('[*] Scope now set to ' + scope);
});
Q.spawn(function*()
{
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
for(let count=0;; count=++count%3)
{
let output = (new Date()).toString();
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);
yield Q.delay(500);
}
});
receive.js:
#!/usr/bin/env node
const Q = require('q');
const Broker = require('amqplib');
const uuid = require('node-uuid');
const Rx = require('rx');
Rx.Node = require('rx-node');
const XCHANGE = 'headers-exchange';
const WORKER_ID = uuid.v4();
const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);
Q.spawn(function*() {
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
let q = yield chan.assertQueue('', { exclusive: true });
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
console.info('[*] Binding with ' + JSON.stringify(opts));
console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);
chan.consume(q.queue, (msg) =>
{
console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
chan.ack(msg);
});
});
本文标签: javascriptHeaders exchange example using RabbitMQ in NodejsStack Overflow
版权声明:本文标题:javascript - Headers exchange example using RabbitMQ in Node.js - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743702207a2524512.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论