admin管理员组文章数量:1122832
I'm building a service around Azure Service Bus with Java Language and Spring framework using JMS listeners. I could figure out a way to send a message directly to the DeadLetterQueue, without waiting for the retry. This is useful if you know that the error is kind of fatal and makes no sense to retry. Here is my code to send to DLQ directly:
@JmsListener(
destination = "topic-name",
containerFactory = "topicJmsListenerContainerFactory",
subscription = "subscription-name"
)
public void receive(JmsMessage message, Session session) throws JMSException, JsonProcessingException, IllegalAccessException, NoSuchFieldException {
log.info("Message received from {} : {}", subscriptionName, message.getJMSMessageID());
try {
StatusDto status = service.process(parseMessageToDto(message), message.getJMSCorrelationID());
log.info("Message: {} sent with success. Status: {}", message.getJMSCorrelationID(), mapper.writeValueAsString(status));
} catch (FatalException | JsonProcessingException | ConstraintViolationException exception) {
serviceBusHelper.moveToDeadLetterQueue(message, session, exception);
}
}
Here is my serviceBusHelper that is sending the message directly to DLQ.
private final int deadLetterQueue = ProviderConstants.ACK_TYPE.REJECTED.ordinal();
public void moveToDeadLetterQueue(JmsMessage message, Session session, Exception reason)
throws NoSuchFieldException, IllegalAccessException, JMSException, JsonProcessingException {
message.setReadOnlyProperties(false);
String deadLetterReason = reason.getClass().getSimpleName();
message.setStringProperty("DeadLetterReason", deadLetterReason);
message.setStringProperty("DeadLetterErrorDescription", reason.getMessage());
message.setAcknowledgeCallback(buildDlqAcknowledgeCallback(session));
message.acknowledge();
log.info("Moved message: {}, to dead letter queue due to: {}", message.getJMSCorrelationID(), deadLetterReason);
}
JmsAcknowledgeCallback buildDlqAcknowledgeCallback(Session session) throws NoSuchFieldException, IllegalAccessException {
JmsAcknowledgeCallback callback = new JmsAcknowledgeCallback(getInnerSessionFromAzureServiceBus(session));
callback.setAckType(deadLetterQueue);
return callback;
}
JmsSession getInnerSessionFromAzureServiceBus(Session session) throws NoSuchFieldException, IllegalAccessException {
log.debug("Fetch inner session from session with class: {}", session.getClass().getSimpleName());
Session serviceBusJmsSession = (Session) unProxy(session); // Session is usually a proxy.
Field field = serviceBusJmsSession.getClass().getDeclaredField("innerSession");
field.setAccessible(true);
return (JmsSession) field.get(serviceBusJmsSession);
}
Object unProxy(Object proxy) throws NoSuchFieldException, IllegalAccessException {
if (!Proxy.isProxyClass(proxy.getClass())) {
return proxy;
} else {
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
Field targetField = handler.getClass().getDeclaredField("target");
targetField.setAccessible(true);
return unProxy(targetField.get(handler));
}
}
I had to use reflection to be able to fetch the JmsSession and create an acknowledgeCallback with the ACK_TYPE.REJECTED that is sending directly to DLQ the message.
Finally here is my configuration for the topicJmsListenerContainerFactory:
spring.jms.servicebus.namespace=namespace
spring.jms.servicebus.pricing-tier=standard
spring.jms.servicebus.passwordless-enabled=true
spring.jms.servicebus.enabled=true
spring.jms.cache.producers=false
spring.jms.template.session.transacted=false
spring.jms.template.session.acknowledge-mode=client
spring.jms.listener.session.transacted=false
spring.jms.listener.session.acknowledge-mode=client
spring.jms.servicebus.listener.subscription-durable=true
spring.jms.listener.receive-timeout=60000
The acknowledge-mode=client allows me to acknowledge my message and to use the callback that sends the message to DLQ. However that doesn't allow mes to use a transacted session that seems to be able to update and add a message property.
To summarize: With this configuration and code, I'm able to send a message directly to the DLQ but the DeadLetterReason is not added. Do you know or have an idea how I could add this string property?
Thanks a lot.
I'm building a service around Azure Service Bus with Java Language and Spring framework using JMS listeners. I could figure out a way to send a message directly to the DeadLetterQueue, without waiting for the retry. This is useful if you know that the error is kind of fatal and makes no sense to retry. Here is my code to send to DLQ directly:
@JmsListener(
destination = "topic-name",
containerFactory = "topicJmsListenerContainerFactory",
subscription = "subscription-name"
)
public void receive(JmsMessage message, Session session) throws JMSException, JsonProcessingException, IllegalAccessException, NoSuchFieldException {
log.info("Message received from {} : {}", subscriptionName, message.getJMSMessageID());
try {
StatusDto status = service.process(parseMessageToDto(message), message.getJMSCorrelationID());
log.info("Message: {} sent with success. Status: {}", message.getJMSCorrelationID(), mapper.writeValueAsString(status));
} catch (FatalException | JsonProcessingException | ConstraintViolationException exception) {
serviceBusHelper.moveToDeadLetterQueue(message, session, exception);
}
}
Here is my serviceBusHelper that is sending the message directly to DLQ.
private final int deadLetterQueue = ProviderConstants.ACK_TYPE.REJECTED.ordinal();
public void moveToDeadLetterQueue(JmsMessage message, Session session, Exception reason)
throws NoSuchFieldException, IllegalAccessException, JMSException, JsonProcessingException {
message.setReadOnlyProperties(false);
String deadLetterReason = reason.getClass().getSimpleName();
message.setStringProperty("DeadLetterReason", deadLetterReason);
message.setStringProperty("DeadLetterErrorDescription", reason.getMessage());
message.setAcknowledgeCallback(buildDlqAcknowledgeCallback(session));
message.acknowledge();
log.info("Moved message: {}, to dead letter queue due to: {}", message.getJMSCorrelationID(), deadLetterReason);
}
JmsAcknowledgeCallback buildDlqAcknowledgeCallback(Session session) throws NoSuchFieldException, IllegalAccessException {
JmsAcknowledgeCallback callback = new JmsAcknowledgeCallback(getInnerSessionFromAzureServiceBus(session));
callback.setAckType(deadLetterQueue);
return callback;
}
JmsSession getInnerSessionFromAzureServiceBus(Session session) throws NoSuchFieldException, IllegalAccessException {
log.debug("Fetch inner session from session with class: {}", session.getClass().getSimpleName());
Session serviceBusJmsSession = (Session) unProxy(session); // Session is usually a proxy.
Field field = serviceBusJmsSession.getClass().getDeclaredField("innerSession");
field.setAccessible(true);
return (JmsSession) field.get(serviceBusJmsSession);
}
Object unProxy(Object proxy) throws NoSuchFieldException, IllegalAccessException {
if (!Proxy.isProxyClass(proxy.getClass())) {
return proxy;
} else {
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
Field targetField = handler.getClass().getDeclaredField("target");
targetField.setAccessible(true);
return unProxy(targetField.get(handler));
}
}
I had to use reflection to be able to fetch the JmsSession and create an acknowledgeCallback with the ACK_TYPE.REJECTED that is sending directly to DLQ the message.
Finally here is my configuration for the topicJmsListenerContainerFactory:
spring.jms.servicebus.namespace=namespace
spring.jms.servicebus.pricing-tier=standard
spring.jms.servicebus.passwordless-enabled=true
spring.jms.servicebus.enabled=true
spring.jms.cache.producers=false
spring.jms.template.session.transacted=false
spring.jms.template.session.acknowledge-mode=client
spring.jms.listener.session.transacted=false
spring.jms.listener.session.acknowledge-mode=client
spring.jms.servicebus.listener.subscription-durable=true
spring.jms.listener.receive-timeout=60000
The acknowledge-mode=client allows me to acknowledge my message and to use the callback that sends the message to DLQ. However that doesn't allow mes to use a transacted session that seems to be able to update and add a message property.
To summarize: With this configuration and code, I'm able to send a message directly to the DLQ but the DeadLetterReason is not added. Do you know or have an idea how I could add this string property?
Thanks a lot.
Share Improve this question edited Nov 25, 2024 at 7:32 Blackarrow asked Nov 22, 2024 at 8:07 BlackarrowBlackarrow 431 silver badge4 bronze badges 1- Can you share your GitHub repo? – Dasari Kamali Commented Nov 22, 2024 at 9:43
1 Answer
Reset to default 0I tried a Spring Boot application to move a message to the Dead Letter Queue
using JMS and successfully received it.
- The below code connects to the Azure Service Bus topic to receive a message, moves the message to the Dead Letter Queue to show a failure, and uses the
PEEK_LOCK
receive mode to lock and process messages.
ServiceBusReceiver.java :
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
public class ServiceBusReceiver {
@Value("${azure.servicebus.connection-string}")
private String connectionString;
@Value("${azure.servicebus.topic-name}")
private String topicName;
@Value("${azure.servicebus.subscription-name}")
private String subscriptionName;
private final JmsTemplate jmsTemplate;
public ServiceBusReceiver(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@JmsListener(destination = "${azure.servicebus.topic-name}", containerFactory = "jmsListenerContainerFactory")
public void receiveMessages() {
try (ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.buildClient()) {
receiverClient.receiveMessages(1).forEach(message -> {
try {
System.out.println("Simulating failure for message: " + message.getMessageId());
receiverClient.deadLetter(message);
System.out.println("Message " + message.getMessageId() + " moved to Dead Letter Queue.");
} catch (Exception e) {
System.out.println("Failed to process message " + message.getMessageId() + ": " + e.getMessage());
}
});
}
}
}
ServiceBusReceiverController.java :
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ServiceBusReceiverController {
@Autowired
private ServiceBusReceiver serviceBusReceiver;
@GetMapping("/receive-messages")
public String receiveMessages() {
serviceBusReceiver.receiveMessages();
return "Message moved to Dead Letter queue.";
}
}
pom.xml :
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.12.0</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>azure-spring-boot-starter-servicebus-jms</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties :
azure.servicebus.connection-string=<TopicConneString>
azure.servicebus.topic-name=<topicName>
azure.servicebus.subscription-name=<subscriptionName>
I ran the above Spring Boot application and sent a message to the Azure Service Bus Topic as shown below.
I sent the request below in the browser, as shown.
Then, I received the message in the Dead Letter Queue
of the Azure Service Bus Topic, as shown below.
Spring Boot Output :
本文标签:
版权声明:本文标题:In Azure Service Bus how to send a message to dead letter queue with a "DeadLetterReason" with JMS? - Stack Ov 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736305239a1932516.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论