admin管理员组

文章数量:1122832

I'm building a service around Azure Service Bus with Java Language and Spring framework using JMS listeners. I could figure out a way to send a message directly to the DeadLetterQueue, without waiting for the retry. This is useful if you know that the error is kind of fatal and makes no sense to retry. Here is my code to send to DLQ directly:

@JmsListener(
        destination = "topic-name",
        containerFactory = "topicJmsListenerContainerFactory",
        subscription = "subscription-name"
)
public void receive(JmsMessage message, Session session) throws JMSException, JsonProcessingException, IllegalAccessException, NoSuchFieldException {
    log.info("Message received from {} : {}", subscriptionName, message.getJMSMessageID());
    try {
        StatusDto status = service.process(parseMessageToDto(message), message.getJMSCorrelationID());
        log.info("Message: {} sent with success. Status: {}", message.getJMSCorrelationID(), mapper.writeValueAsString(status));
    } catch (FatalException | JsonProcessingException | ConstraintViolationException exception) {
        serviceBusHelper.moveToDeadLetterQueue(message, session, exception);
    }
}

Here is my serviceBusHelper that is sending the message directly to DLQ.

private final int deadLetterQueue = ProviderConstants.ACK_TYPE.REJECTED.ordinal();

public void moveToDeadLetterQueue(JmsMessage message, Session session, Exception reason)
        throws NoSuchFieldException, IllegalAccessException, JMSException, JsonProcessingException {
    message.setReadOnlyProperties(false);
    String deadLetterReason = reason.getClass().getSimpleName();
    message.setStringProperty("DeadLetterReason", deadLetterReason);
    message.setStringProperty("DeadLetterErrorDescription", reason.getMessage());
    message.setAcknowledgeCallback(buildDlqAcknowledgeCallback(session));
    message.acknowledge();
    log.info("Moved message: {}, to dead letter queue due to: {}", message.getJMSCorrelationID(), deadLetterReason);
}

JmsAcknowledgeCallback buildDlqAcknowledgeCallback(Session session) throws NoSuchFieldException, IllegalAccessException {
    JmsAcknowledgeCallback callback = new JmsAcknowledgeCallback(getInnerSessionFromAzureServiceBus(session));
    callback.setAckType(deadLetterQueue);
    return callback;
}

JmsSession getInnerSessionFromAzureServiceBus(Session session) throws NoSuchFieldException, IllegalAccessException {
    log.debug("Fetch inner session from session with class: {}", session.getClass().getSimpleName());
    Session serviceBusJmsSession = (Session) unProxy(session); // Session is usually a proxy.
    Field field = serviceBusJmsSession.getClass().getDeclaredField("innerSession");
    field.setAccessible(true);
    return (JmsSession) field.get(serviceBusJmsSession);
}

Object unProxy(Object proxy) throws NoSuchFieldException, IllegalAccessException {
    if (!Proxy.isProxyClass(proxy.getClass())) {
        return proxy;
    } else {
        InvocationHandler handler = Proxy.getInvocationHandler(proxy);
        Field targetField = handler.getClass().getDeclaredField("target");
        targetField.setAccessible(true);
        return unProxy(targetField.get(handler));
    }
}

I had to use reflection to be able to fetch the JmsSession and create an acknowledgeCallback with the ACK_TYPE.REJECTED that is sending directly to DLQ the message.

Finally here is my configuration for the topicJmsListenerContainerFactory:

spring.jms.servicebus.namespace=namespace
spring.jms.servicebus.pricing-tier=standard
spring.jms.servicebus.passwordless-enabled=true
spring.jms.servicebus.enabled=true
spring.jms.cache.producers=false
spring.jms.template.session.transacted=false
spring.jms.template.session.acknowledge-mode=client
spring.jms.listener.session.transacted=false
spring.jms.listener.session.acknowledge-mode=client
spring.jms.servicebus.listener.subscription-durable=true
spring.jms.listener.receive-timeout=60000

The acknowledge-mode=client allows me to acknowledge my message and to use the callback that sends the message to DLQ. However that doesn't allow mes to use a transacted session that seems to be able to update and add a message property.

To summarize: With this configuration and code, I'm able to send a message directly to the DLQ but the DeadLetterReason is not added. Do you know or have an idea how I could add this string property?

Thanks a lot.

I'm building a service around Azure Service Bus with Java Language and Spring framework using JMS listeners. I could figure out a way to send a message directly to the DeadLetterQueue, without waiting for the retry. This is useful if you know that the error is kind of fatal and makes no sense to retry. Here is my code to send to DLQ directly:

@JmsListener(
        destination = "topic-name",
        containerFactory = "topicJmsListenerContainerFactory",
        subscription = "subscription-name"
)
public void receive(JmsMessage message, Session session) throws JMSException, JsonProcessingException, IllegalAccessException, NoSuchFieldException {
    log.info("Message received from {} : {}", subscriptionName, message.getJMSMessageID());
    try {
        StatusDto status = service.process(parseMessageToDto(message), message.getJMSCorrelationID());
        log.info("Message: {} sent with success. Status: {}", message.getJMSCorrelationID(), mapper.writeValueAsString(status));
    } catch (FatalException | JsonProcessingException | ConstraintViolationException exception) {
        serviceBusHelper.moveToDeadLetterQueue(message, session, exception);
    }
}

Here is my serviceBusHelper that is sending the message directly to DLQ.

private final int deadLetterQueue = ProviderConstants.ACK_TYPE.REJECTED.ordinal();

public void moveToDeadLetterQueue(JmsMessage message, Session session, Exception reason)
        throws NoSuchFieldException, IllegalAccessException, JMSException, JsonProcessingException {
    message.setReadOnlyProperties(false);
    String deadLetterReason = reason.getClass().getSimpleName();
    message.setStringProperty("DeadLetterReason", deadLetterReason);
    message.setStringProperty("DeadLetterErrorDescription", reason.getMessage());
    message.setAcknowledgeCallback(buildDlqAcknowledgeCallback(session));
    message.acknowledge();
    log.info("Moved message: {}, to dead letter queue due to: {}", message.getJMSCorrelationID(), deadLetterReason);
}

JmsAcknowledgeCallback buildDlqAcknowledgeCallback(Session session) throws NoSuchFieldException, IllegalAccessException {
    JmsAcknowledgeCallback callback = new JmsAcknowledgeCallback(getInnerSessionFromAzureServiceBus(session));
    callback.setAckType(deadLetterQueue);
    return callback;
}

JmsSession getInnerSessionFromAzureServiceBus(Session session) throws NoSuchFieldException, IllegalAccessException {
    log.debug("Fetch inner session from session with class: {}", session.getClass().getSimpleName());
    Session serviceBusJmsSession = (Session) unProxy(session); // Session is usually a proxy.
    Field field = serviceBusJmsSession.getClass().getDeclaredField("innerSession");
    field.setAccessible(true);
    return (JmsSession) field.get(serviceBusJmsSession);
}

Object unProxy(Object proxy) throws NoSuchFieldException, IllegalAccessException {
    if (!Proxy.isProxyClass(proxy.getClass())) {
        return proxy;
    } else {
        InvocationHandler handler = Proxy.getInvocationHandler(proxy);
        Field targetField = handler.getClass().getDeclaredField("target");
        targetField.setAccessible(true);
        return unProxy(targetField.get(handler));
    }
}

I had to use reflection to be able to fetch the JmsSession and create an acknowledgeCallback with the ACK_TYPE.REJECTED that is sending directly to DLQ the message.

Finally here is my configuration for the topicJmsListenerContainerFactory:

spring.jms.servicebus.namespace=namespace
spring.jms.servicebus.pricing-tier=standard
spring.jms.servicebus.passwordless-enabled=true
spring.jms.servicebus.enabled=true
spring.jms.cache.producers=false
spring.jms.template.session.transacted=false
spring.jms.template.session.acknowledge-mode=client
spring.jms.listener.session.transacted=false
spring.jms.listener.session.acknowledge-mode=client
spring.jms.servicebus.listener.subscription-durable=true
spring.jms.listener.receive-timeout=60000

The acknowledge-mode=client allows me to acknowledge my message and to use the callback that sends the message to DLQ. However that doesn't allow mes to use a transacted session that seems to be able to update and add a message property.

To summarize: With this configuration and code, I'm able to send a message directly to the DLQ but the DeadLetterReason is not added. Do you know or have an idea how I could add this string property?

Thanks a lot.

Share Improve this question edited Nov 25, 2024 at 7:32 Blackarrow asked Nov 22, 2024 at 8:07 BlackarrowBlackarrow 431 silver badge4 bronze badges 1
  • Can you share your GitHub repo? – Dasari Kamali Commented Nov 22, 2024 at 9:43
Add a comment  | 

1 Answer 1

Reset to default 0

I tried a Spring Boot application to move a message to the Dead Letter Queue using JMS and successfully received it.

  • The below code connects to the Azure Service Bus topic to receive a message, moves the message to the Dead Letter Queue to show a failure, and uses the PEEK_LOCK receive mode to lock and process messages.

ServiceBusReceiver.java :

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

@Service
public class ServiceBusReceiver {

    @Value("${azure.servicebus.connection-string}")
    private String connectionString;

    @Value("${azure.servicebus.topic-name}")
    private String topicName;

    @Value("${azure.servicebus.subscription-name}")
    private String subscriptionName;

    private final JmsTemplate jmsTemplate;

    public ServiceBusReceiver(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    @JmsListener(destination = "${azure.servicebus.topic-name}", containerFactory = "jmsListenerContainerFactory")
    public void receiveMessages() {
        try (ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .receiver()
                .topicName(topicName)
                .subscriptionName(subscriptionName)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .buildClient()) {

            receiverClient.receiveMessages(1).forEach(message -> {
                try {
                    System.out.println("Simulating failure for message: " + message.getMessageId());
                    receiverClient.deadLetter(message);
                    System.out.println("Message " + message.getMessageId() + " moved to Dead Letter Queue.");
                } catch (Exception e) {
                    System.out.println("Failed to process message " + message.getMessageId() + ": " + e.getMessage());
                }
            });
        }
    }
}

ServiceBusReceiverController.java :

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ServiceBusReceiverController {
    @Autowired
    private ServiceBusReceiver serviceBusReceiver;
    @GetMapping("/receive-messages")
    public String receiveMessages() {
        serviceBusReceiver.receiveMessages();
        return "Message moved to Dead Letter queue.";
    }
}

pom.xml :

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>2.5.4</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-servicebus</artifactId>
        <version>7.12.0</version>
    </dependency>
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>com.azure.spring</groupId>
        <artifactId>azure-spring-boot-starter-servicebus-jms</artifactId>
        <version>4.0.0</version> 
        </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties :

azure.servicebus.connection-string=<TopicConneString>
azure.servicebus.topic-name=<topicName>
azure.servicebus.subscription-name=<subscriptionName>

I ran the above Spring Boot application and sent a message to the Azure Service Bus Topic as shown below.

I sent the request below in the browser, as shown.

Then, I received the message in the Dead Letter Queue of the Azure Service Bus Topic, as shown below.

Spring Boot Output :

本文标签: