admin管理员组文章数量:1336711
I have Spring boot - RabbitMQ app. The exchange used is topicexchange.
public static final String RPC_REQ_QUEUE = "req.queue";
public static final String RPC_RES_QUEUE = "res.queue";
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public Queue reqQueue() { return new Queue(RPC_REQ_QUEUE); }
@Bean
public Queue resQueue() { return new Queue(RPC_RES_QUEUE); }
@Bean
public TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE); }
@Bean
public Binding requestBinding(TopicExchange exchange, Queue reqQueue) {
return BindingBuilder.bind(reqQueue).to(exchange)
.with(RPC_REQ_QUEUE);
}
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
Client and Server @Profile("client") @Component
public class MyConsumerRequestor {
.....
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template.convertSendAndReceive
(exchange.getName(), RPC_REQ_QUEUE,
String.valueOf(start++),
message -> {
message.getMessageProperties().setReplyTo(RPC_RES_QUEUE);
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
System.out.println(" [.] Got '" + response + "'");
}
The server
@Profile("server")
@Component
public class MyProducerBackend {
@RabbitListener(queues = RPC_REQ_QUEUE, concurrency = "2")
public void fibonacci(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(body);
int result = fib(n);
System.out.println(" [.] Returning " + result);
// Send response to RPC_RES_QUEUE
rabbitTemplate.convertAndSend(exchange.getName(), RPC_RES_QUEUE, String.valueOf(result),
msg -> {
msg.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
return msg;
});
}
Observations:
The client is sending messages, and server is receiving and returning. But the correlation Ids on server are like 1, 2 , 3 ...and not the ones sent from client side.
The application is started with client and server profiles.
The RabbitMq example Rabbit uses Direct exchange and can't demonstrate correlation id and concurrency /async.
Issue:
[x] Requesting fib(1)
[.] Got 'null'
why is correlation id mismatch ? Do I need to split the beans in the config for client and server ?
Note: I am using a fixed reply queue.
Edit:
Source code : source code git
I have Spring boot - RabbitMQ app. The exchange used is topicexchange.
public static final String RPC_REQ_QUEUE = "req.queue";
public static final String RPC_RES_QUEUE = "res.queue";
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public Queue reqQueue() { return new Queue(RPC_REQ_QUEUE); }
@Bean
public Queue resQueue() { return new Queue(RPC_RES_QUEUE); }
@Bean
public TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE); }
@Bean
public Binding requestBinding(TopicExchange exchange, Queue reqQueue) {
return BindingBuilder.bind(reqQueue).to(exchange)
.with(RPC_REQ_QUEUE);
}
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
Client and Server @Profile("client") @Component
public class MyConsumerRequestor {
.....
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template.convertSendAndReceive
(exchange.getName(), RPC_REQ_QUEUE,
String.valueOf(start++),
message -> {
message.getMessageProperties().setReplyTo(RPC_RES_QUEUE);
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
System.out.println(" [.] Got '" + response + "'");
}
The server
@Profile("server")
@Component
public class MyProducerBackend {
@RabbitListener(queues = RPC_REQ_QUEUE, concurrency = "2")
public void fibonacci(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(body);
int result = fib(n);
System.out.println(" [.] Returning " + result);
// Send response to RPC_RES_QUEUE
rabbitTemplate.convertAndSend(exchange.getName(), RPC_RES_QUEUE, String.valueOf(result),
msg -> {
msg.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
return msg;
});
}
Observations:
The client is sending messages, and server is receiving and returning. But the correlation Ids on server are like 1, 2 , 3 ...and not the ones sent from client side.
The application is started with client and server profiles.
The RabbitMq example Rabbit uses Direct exchange and can't demonstrate correlation id and concurrency /async.
Issue:
[x] Requesting fib(1)
[.] Got 'null'
why is correlation id mismatch ? Do I need to split the beans in the config for client and server ?
Note: I am using a fixed reply queue.
Edit:
Source code : source code git
Share Improve this question edited Nov 22, 2024 at 20:05 Kris Swat asked Nov 19, 2024 at 16:33 Kris SwatKris Swat 1,0342 gold badges18 silver badges49 bronze badges1 Answer
Reset to default 0You showed this in your question:
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
But that part is missed from your code.
When I added this into your Config
:
@Bean
public Binding responseBinding(TopicExchange rpcExchange, Queue replyQueue) {
return BindingBuilder.bind(replyQueue).to(rpcExchange).with(RPC_REPLY_QUEUE);
}
It has started working.
The discrepancy with correlationId
that it is changed internally in the RabbitTemplate
for its own logic.
Technically you must not worry about that one at all. See how @RabbitListener
can handle replies: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven/reply.html
So, this is a change you should do on the server side:
@RabbitListener(queues = RPC_REQUEST_QUEUE, concurrency = "2")
@SendTo(RPC_EXCHANGE + '/' + RPC_REPLY_QUEUE)
public int processRequest(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int number = Integer.parseInt(body);
System.out.println(message);
System.out.println("[Server] Received request: " + number);
int result = fibonacci(number);
System.out.println("[Server] Computed result: " + result);
return result;
}
And this is on the client:
public int sendRpcRequest() {
Integer response = (Integer) rabbitTemplate.convertSendAndReceive(
RPC_EXCHANGE,
"rpc.request.key",
String.valueOf(number++));
if (response != null) {
System.out.println("[Client] Received response: " + response);
} else {
System.err.println("[Client] Response is null!");
}
return response == null ? -1 : response;
}
You see, no any correlation headache and @RabbitListener
is as simple as possible.
本文标签: Spring RabbitMQRPCCorrelationId not matchingTopicExchangeClient ServermodelStack Overflow
版权声明:本文标题:Spring RabbitMQ - RPC - CorrelationId not matching - TopicExchange - Client- Server - model - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742413131a2470188.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论