admin管理员组文章数量:1336289
I have a (Java 21/EE10 on WildFly 33) container that connects to Azure Service Bus using JMS 2.0 (Hence ESB Premium Tier). All is well.
However when redeploying (or even restarting) the POD the @Startup function that activates the listening fails with this: Customer Consumer got error setting up the service bus: jakarta.jms.JMSRuntimeException: Cannot create a new consumer on subscriber cmsdev-customer-sub$ECC_GIA$D because there is already a subscriber using the same name with an active consumer on the same or different topic. Reference:a55df62a-5bea-43c2-880a-f32509828982, TrackingId:909c3640-65a8-40f5-8440-d53d6c6ba766_B60, SystemTracker:gi::G12:97974857:cmsdev-customer-sub,
As i do set a clientId (to keep the ACIDity provided by The Service Bus) to the same it feels correct that it fails as the cluster makes sure the new Revision is started BEFORE it tears down the previous one.
It is a little of Catch 22.
We really would like to keep the Zero-down deployment offered by ACA.
Anyone with insights and willing to give advice would be highly appreciated.
Thanks
Gerry
I have a (Java 21/EE10 on WildFly 33) container that connects to Azure Service Bus using JMS 2.0 (Hence ESB Premium Tier). All is well.
However when redeploying (or even restarting) the POD the @Startup function that activates the listening fails with this: Customer Consumer got error setting up the service bus: jakarta.jms.JMSRuntimeException: Cannot create a new consumer on subscriber cmsdev-customer-sub$ECC_GIA$D because there is already a subscriber using the same name with an active consumer on the same or different topic. Reference:a55df62a-5bea-43c2-880a-f32509828982, TrackingId:909c3640-65a8-40f5-8440-d53d6c6ba766_B60, SystemTracker:gi::G12:97974857:cmsdev-customer-sub,
As i do set a clientId (to keep the ACIDity provided by The Service Bus) to the same it feels correct that it fails as the cluster makes sure the new Revision is started BEFORE it tears down the previous one.
It is a little of Catch 22.
We really would like to keep the Zero-down deployment offered by ACA.
Anyone with insights and willing to give advice would be highly appreciated.
Thanks
Gerry
Share Improve this question asked Nov 19, 2024 at 18:12 Gerry AskefalkGerry Askefalk 535 bronze badges 1- Can you share your code in the question? – Dasari Kamali Commented Nov 20, 2024 at 0:07
1 Answer
Reset to default 0I created a sample Java application and was able to successfully listen for messages from an Azure Service Bus topic using JMS, which creates a new subscription with the client ID.
To avoid issues, I created a unique client ID for each instance so that each container instance will have its own client ID, even during redeployment.
I’ve added the below line to my Main.java
file
String clientId = "client-" + UUID.randomUUID().toString();
Main.java :
import javax.jms.*;
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory;
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactorySettings;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import java.util.UUID;
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final String CONNECTION_STRING = "<TopicConneString>";
private static final String TOPIC_NAME = "<topicName>";
private static final String SUBSCRIPTION_NAME = "<subscriptionName>";
private Connection connection;
private Session session;
private MessageConsumer consumer;
public static void main(String[] args) {
Main serviceBusApp = new Main();
serviceBusApp.run();
}
public void run() {
try {
initializeConnection();
createSubscriptionIfNeeded(SUBSCRIPTION_NAME);
sendMessage("Hello from Azure Service Bus!");
receiveMessages(SUBSCRIPTION_NAME);
} catch (JMSException e) {
logger.error("Error during Service Bus operation: {}", e.getMessage());
} finally {
closeResources();
}
}
private void initializeConnection() throws JMSException {
try {
String clientId = "client-" + UUID.randomUUID().toString();
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory(
CONNECTION_STRING, new ServiceBusJmsConnectionFactorySettings()
);
connection = factory.createConnection();
connection.setClientID(clientId);
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
logger.info("Successfully connected to Azure Service Bus with clientId: {}", clientId);
} catch (JMSException e) {
logger.error("Error initializing connection: {}", e.getMessage());
throw e;
}
}
private void createSubscriptionIfNeeded(String subscriptionName) {
try {
logger.info("Checking if subscription exists...");
logger.info("Subscription '{}' verified/created.", subscriptionName);
} catch (Exception e) {
logger.error("Error creating or verifying subscription: {}", e.getMessage());
}
}
public void sendMessage(String messageContent) {
try {
Destination destination = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(messageContent);
producer.send(message);
logger.info("Sent a single message to the topic: {}", TOPIC_NAME);
producer.close();
} catch (JMSException e) {
logger.error("Error sending message: {}", e.getMessage());
}
}
public void receiveMessages(String subscriptionName) {
try {
Destination destination = session.createTopic(TOPIC_NAME);
consumer = session.createDurableSubscriber((Topic) destination, subscriptionName);
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
String messageBody = ((TextMessage) message).getText();
logger.info("Received: {}", messageBody);
message.acknowledge();
}
} catch (JMSException e) {
logger.error("Error processing message: {}", e.getMessage());
}
});
logger.info("Waiting for messages... Press enter to stop.");
while (true) {
if (System.in.available() > 0) break;
}
} catch (Exception e) {
logger.error("Error receiving messages: {}", e.getMessage());
}
}
private void closeResources() {
try {
if (consumer != null) consumer.close();
if (session != null) session.close();
if (connection != null) connection.close();
logger.info("Closed connection and resources.");
} catch (JMSException e) {
logger.error("Error closing resources: {}", e.getMessage());
}
}
}
pom.xml :
<dependencies>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.12.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus-jms</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
<scope>runtime</scope>
</dependency>
</dependencies>
Dockerfile :
FROM openjdk:21-jdk-slim as build
WORKDIR /app
COPY target/example-1.0-SNAPSHOT.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
Output :
I sent and received messages to and from the Azure Service Bus Topic.
The messages sent and received in the Azure Service Bus topic for the newly created subscription with the client ID are shown below.
Azure Container Apps :
I got the following logs of sent and received messages in the Azure Container Apps as shown below.
本文标签:
版权声明:本文标题:jms - Azure Service Bus and Azure Container Apps - Redeploy fails as there alread is a client using tha sub against that topic - 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742407340a2469092.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论