admin管理员组

文章数量:1343369

I have been looking everywhere for a headers exchange example using RabbitMQ in Node.js. If someone could point me in the right direction, that would be great. Here's what I have so far:

publisher method (create a publisher)

RabbitMQ.prototype.publisher = function(exchange, type) {
 console.log('New publisher, exchange: '+exchange+', type: '+type);
 amqp.then(function(conn) {
    conn.createConfirmChannel().then(function(ch) {
        publishers[exchange] = {};
        publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
        publishers[exchange].ch = ch;
    });
 },function(err){
    console.error("[AMQP]", err.message);
    return setTimeout(function(){
        self.connect(URI);
    }, 1000);
 }).then(null, console.log);
};

publish method

RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
 try {    
    publishers[exchange].assert.then(function(){
        publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
            if (err) {
                console.error("[AMQP] publish", err);
                offlinePubQueue.push([exchange, routingKey, content]);
                publishers[exchange].ch.connection.close();
            }
        });
    });
 } catch (e) {                                                                                                                               
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
 }
};

consumer method (create a consumer)

RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
 amqp.then(function(conn) {
  conn.createChannel().then(function(ch) {

    var ok = ch.assertExchange(exchange, type, {durable: true});

    ok.then(function() {
      ch.assertQueue('', {exclusive: true});
    });

    ok = ok.then(function(qok) {
      var queue = qok.queue;
      ch.bindQueue(queue,exchange,routingKey)
    });

    ok = ok.then(function(queue) {
      ch.consume(queue, function(msg){
            cb(msg,ch);
      }, {noAck: false});
    });

    ok.then(function() {
      console.log(' [*] Waiting for logs. To exit press CTRL+C.');
    });

  });
 }).then(null, console.warn);
};

The above example works fine with topics, but I'm not sure how to make the transition to headers. I am pretty sure I need to change my binding approach, but haven't been able to find any examples on how exactly to acplish this.

Any help would be greatly appreciated!

I have been looking everywhere for a headers exchange example using RabbitMQ in Node.js. If someone could point me in the right direction, that would be great. Here's what I have so far:

publisher method (create a publisher)

RabbitMQ.prototype.publisher = function(exchange, type) {
 console.log('New publisher, exchange: '+exchange+', type: '+type);
 amqp.then(function(conn) {
    conn.createConfirmChannel().then(function(ch) {
        publishers[exchange] = {};
        publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
        publishers[exchange].ch = ch;
    });
 },function(err){
    console.error("[AMQP]", err.message);
    return setTimeout(function(){
        self.connect(URI);
    }, 1000);
 }).then(null, console.log);
};

publish method

RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
 try {    
    publishers[exchange].assert.then(function(){
        publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
            if (err) {
                console.error("[AMQP] publish", err);
                offlinePubQueue.push([exchange, routingKey, content]);
                publishers[exchange].ch.connection.close();
            }
        });
    });
 } catch (e) {                                                                                                                               
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
 }
};

consumer method (create a consumer)

RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
 amqp.then(function(conn) {
  conn.createChannel().then(function(ch) {

    var ok = ch.assertExchange(exchange, type, {durable: true});

    ok.then(function() {
      ch.assertQueue('', {exclusive: true});
    });

    ok = ok.then(function(qok) {
      var queue = qok.queue;
      ch.bindQueue(queue,exchange,routingKey)
    });

    ok = ok.then(function(queue) {
      ch.consume(queue, function(msg){
            cb(msg,ch);
      }, {noAck: false});
    });

    ok.then(function() {
      console.log(' [*] Waiting for logs. To exit press CTRL+C.');
    });

  });
 }).then(null, console.warn);
};

The above example works fine with topics, but I'm not sure how to make the transition to headers. I am pretty sure I need to change my binding approach, but haven't been able to find any examples on how exactly to acplish this.

Any help would be greatly appreciated!

Share Improve this question asked Dec 10, 2015 at 18:35 user1828780user1828780 5471 gold badge8 silver badges23 bronze badges
Add a ment  | 

1 Answer 1

Reset to default 11

I stumbled across this question looking for the same answers for amqplib. Unfortunately, like you I found all available documentation lacking. After looking over the source, reading over the protocol a bit, and trying out a few binations, this finally did it for me.

...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...

...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...

The full working code is below. The auth info below is faked, so you'll have to use your own. I'm also using ES6, nodejs version 6.5, and amqplib. There may be an issue with giving your headers x- prefixes and/or using reserved words as header names, but I'm not too sure (I'd have to see the RabbitMQ source).

emit.js:

#!/usr/bin/env node

const XCHANGE = 'headers-exchange';

const Q      = require('q');
const Broker = require('amqplib');

let scope = 'anonymous';

process.on('uncaughtException', (exception) => {
    console.error(`"::ERROR:: Uncaught exception ${exception}`);
});

process.argv.slice(2).forEach((arg) =>
{
    scope = arg;
    console.info('[*] Scope now set to ' + scope);
});

Q.spawn(function*()
{
    let conn = yield Broker.connect('amqp://root:root@localhost');
    let chan = yield conn.createChannel();

    chan.assertExchange(XCHANGE, 'headers', { durable: false });

    for(let count=0;; count=++count%3)
    {
        let output = (new Date()).toString();
        let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
        chan.publish(XCHANGE, '', Buffer.from(output), opts);
        console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);

        yield Q.delay(500);
    }
});

receive.js:

#!/usr/bin/env node

const Q      = require('q');
const Broker = require('amqplib');
const uuid   = require('node-uuid');
const Rx     = require('rx');

Rx.Node = require('rx-node');

const XCHANGE = 'headers-exchange';
const WORKER_ID = uuid.v4();
const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);

Q.spawn(function*() {
    let conn = yield Broker.connect('amqp://root:root@localhost');
    let chan = yield conn.createChannel();

    chan.assertExchange(XCHANGE, 'headers', { durable: false });

    let q = yield chan.assertQueue('', { exclusive: true });
    let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };

    chan.bindQueue(q.queue, XCHANGE, '', opts);
    console.info('[*] Binding with ' + JSON.stringify(opts));

    console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);

    chan.consume(q.queue, (msg) =>
    {
        console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
        chan.ack(msg);
    });
});

本文标签: javascriptHeaders exchange example using RabbitMQ in NodejsStack Overflow