admin管理员组

文章数量:1336289

I have a (Java 21/EE10 on WildFly 33) container that connects to Azure Service Bus using JMS 2.0 (Hence ESB Premium Tier). All is well.

However when redeploying (or even restarting) the POD the @Startup function that activates the listening fails with this: Customer Consumer got error setting up the service bus: jakarta.jms.JMSRuntimeException: Cannot create a new consumer on subscriber cmsdev-customer-sub$ECC_GIA$D because there is already a subscriber using the same name with an active consumer on the same or different topic. Reference:a55df62a-5bea-43c2-880a-f32509828982, TrackingId:909c3640-65a8-40f5-8440-d53d6c6ba766_B60, SystemTracker:gi::G12:97974857:cmsdev-customer-sub,

As i do set a clientId (to keep the ACIDity provided by The Service Bus) to the same it feels correct that it fails as the cluster makes sure the new Revision is started BEFORE it tears down the previous one.

It is a little of Catch 22.

We really would like to keep the Zero-down deployment offered by ACA.

Anyone with insights and willing to give advice would be highly appreciated.

Thanks

Gerry

I have a (Java 21/EE10 on WildFly 33) container that connects to Azure Service Bus using JMS 2.0 (Hence ESB Premium Tier). All is well.

However when redeploying (or even restarting) the POD the @Startup function that activates the listening fails with this: Customer Consumer got error setting up the service bus: jakarta.jms.JMSRuntimeException: Cannot create a new consumer on subscriber cmsdev-customer-sub$ECC_GIA$D because there is already a subscriber using the same name with an active consumer on the same or different topic. Reference:a55df62a-5bea-43c2-880a-f32509828982, TrackingId:909c3640-65a8-40f5-8440-d53d6c6ba766_B60, SystemTracker:gi::G12:97974857:cmsdev-customer-sub,

As i do set a clientId (to keep the ACIDity provided by The Service Bus) to the same it feels correct that it fails as the cluster makes sure the new Revision is started BEFORE it tears down the previous one.

It is a little of Catch 22.

We really would like to keep the Zero-down deployment offered by ACA.

Anyone with insights and willing to give advice would be highly appreciated.

Thanks

Gerry

Share Improve this question asked Nov 19, 2024 at 18:12 Gerry AskefalkGerry Askefalk 535 bronze badges 1
  • Can you share your code in the question? – Dasari Kamali Commented Nov 20, 2024 at 0:07
Add a comment  | 

1 Answer 1

Reset to default 0

I created a sample Java application and was able to successfully listen for messages from an Azure Service Bus topic using JMS, which creates a new subscription with the client ID.

To avoid issues, I created a unique client ID for each instance so that each container instance will have its own client ID, even during redeployment.

I’ve added the below line to my Main.java file

String clientId = "client-" + UUID.randomUUID().toString();

Main.java :

import javax.jms.*;
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory;
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactorySettings;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import java.util.UUID;

public class Main {

    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    private static final String CONNECTION_STRING = "<TopicConneString>";
    private static final String TOPIC_NAME = "<topicName>";
    private static final String SUBSCRIPTION_NAME = "<subscriptionName>";

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public static void main(String[] args) {
        Main serviceBusApp = new Main();
        serviceBusApp.run();
    }

    public void run() {
        try {
            initializeConnection();
            createSubscriptionIfNeeded(SUBSCRIPTION_NAME);
            sendMessage("Hello from Azure Service Bus!");
            receiveMessages(SUBSCRIPTION_NAME);
        } catch (JMSException e) {
            logger.error("Error during Service Bus operation: {}", e.getMessage());
        } finally {
            closeResources();
        }
    }

    private void initializeConnection() throws JMSException {
        try {
            String clientId = "client-" + UUID.randomUUID().toString();
            ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory(
                    CONNECTION_STRING, new ServiceBusJmsConnectionFactorySettings()
            );
            connection = factory.createConnection();
            connection.setClientID(clientId);
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            connection.start();
            logger.info("Successfully connected to Azure Service Bus with clientId: {}", clientId);
        } catch (JMSException e) {
            logger.error("Error initializing connection: {}", e.getMessage());
            throw e;
        }
    }

    private void createSubscriptionIfNeeded(String subscriptionName) {
        try {
            logger.info("Checking if subscription exists...");
            logger.info("Subscription '{}' verified/created.", subscriptionName);
        } catch (Exception e) {
            logger.error("Error creating or verifying subscription: {}", e.getMessage());
        }
    }

    public void sendMessage(String messageContent) {
        try {
            Destination destination = session.createTopic(TOPIC_NAME);
            MessageProducer producer = session.createProducer(destination);
            TextMessage message = session.createTextMessage(messageContent);
            producer.send(message);
            logger.info("Sent a single message to the topic: {}", TOPIC_NAME);
            producer.close();
        } catch (JMSException e) {
            logger.error("Error sending message: {}", e.getMessage());
        }
    }

    public void receiveMessages(String subscriptionName) {
        try {
            Destination destination = session.createTopic(TOPIC_NAME);
            consumer = session.createDurableSubscriber((Topic) destination, subscriptionName);
            consumer.setMessageListener(message -> {
                try {
                    if (message instanceof TextMessage) {
                        String messageBody = ((TextMessage) message).getText();
                        logger.info("Received: {}", messageBody);
                        message.acknowledge();
                    }
                } catch (JMSException e) {
                    logger.error("Error processing message: {}", e.getMessage());
                }
            });
            logger.info("Waiting for messages... Press enter to stop.");
            while (true) {
                if (System.in.available() > 0) break;
            }
        } catch (Exception e) {
            logger.error("Error receiving messages: {}", e.getMessage());
        }
    }

    private void closeResources() {
        try {
            if (consumer != null) consumer.close();
            if (session != null) session.close();
            if (connection != null) connection.close();
            logger.info("Closed connection and resources.");
        } catch (JMSException e) {
            logger.error("Error closing resources: {}", e.getMessage());
        }
    }
}

pom.xml :

<dependencies>  
    <dependency>  
        <groupId>javax.jms</groupId>  
        <artifactId>javax.jms-api</artifactId>  
        <version>2.0.1</version>  
    </dependency>  
    <dependency>  
        <groupId>com.azure</groupId>  
        <artifactId>azure-messaging-servicebus</artifactId>  
        <version>7.12.0</version>  
    </dependency>  
    <dependency>  
        <groupId>com.microsoft.azure</groupId>  
        <artifactId>azure-servicebus-jms</artifactId>  
        <version>1.0.2</version>  
    </dependency>  
    <dependency>  
        <groupId>.slf4j</groupId>  
        <artifactId>slf4j-api</artifactId>  
        <version>1.7.32</version>  
    </dependency>  
    <dependency>  
        <groupId>.slf4j</groupId>  
        <artifactId>slf4j-simple</artifactId>  
        <version>1.7.32</version>  
        <scope>runtime</scope>  
    </dependency>  
</dependencies>

Dockerfile :

FROM openjdk:21-jdk-slim as build
WORKDIR /app
COPY target/example-1.0-SNAPSHOT.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]

Output :

I sent and received messages to and from the Azure Service Bus Topic.

The messages sent and received in the Azure Service Bus topic for the newly created subscription with the client ID are shown below.

Azure Container Apps :

I got the following logs of sent and received messages in the Azure Container Apps as shown below.

本文标签: