admin管理员组

文章数量:1123415

We have a system that successfully send messages across a normal, non-sharded Oracle Advanced Queue (AQ), but when we change to a Sharded Queue, the Java application is unable to consume the messages. The messages are posted from PL/SQL.

Working Setup

Create Queues

begin
  dbms_aqadm.create_queue_table(
    queue_table => 'poc_queue_source.nps_transactions_qt',
    queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
    multiple_consumers => FALSE
  );
  dbms_aqadm.create_queue(
    queue_name => 'poc_queue_source.nps_transactions_queue',
    queue_table => 'poc_queue_source.nps_transactions_qt'
  );
  dbms_aqadm.start_queue(
    queue_name => 'poc_queue_source.nps_transactions_queue'
  );
end;
/

Posting in PL/SQL (in a trigger):

declare
  row_json            varchar2(4000);
  enqueue_options     dbms_aq.enqueue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      RAW(16);
  message             SYS.AQ$_JMS_TEXT_MESSAGE := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
begin
  row_json := json_object(
    'transactionId'  value  :NEW.transactionid,
    'srcaccountid'  value  :NEW.srcaccountid,
    'dstaccountid'  value  :NEW.dstaccountid
    -- More stuff here
    null on null
  );
  message.set_text(row_json);
  dbms_aq.enqueue(
    queue_name => 'poc_queue_source.nps_transactions_queue',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => message,
    msgid => message_handle
  );
end;

The receiving part is a Spring Boot application using Spring JMS. The last part in the JMSConfiguration class is:

@Bean
public DefaultMessageListenerContainer jmsListenerContainer(
    DefaultJmsListenerContainerFactory jmsListenerContainerFactory,
    JmsMessageListenerImpl messageListener,
    @Value("${application.jms.listener.queue-name}") String queueName,
    @Value("${application.jms.listener.idle-receives-per-task-limit}") Integer idleReceivesLimit
) {
    SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
    endpoint.setMessageListener(messageListener);
    endpoint.setDestination(queueName);

    DefaultMessageListenerContainer container = jmsListenerContainerFactory.createListenerContainer(endpoint);
    // Setting idleReceivesPerTaskLimit makes the Listener scale down concurrency when the queue is idle.
    container.setIdleReceivesPerTaskLimit(idleReceivesLimit);
    return container;
}

Here JmsMessageListenerImpl is our own Component that implements the JMS MessageListener interface.

Library use:

<dependency>
    <groupId>com.oracle.database.messaging</groupId>
    <artifactId>aqapi-jakarta</artifactId>
    <version>23.2.1.0</version>
</dependency>

This setup works. The database posts messages to the queue, and the MessageListener receives them.

Failing Setup

But if I change to a Sharded Queue, it no longer receives them.

Sharded Queue setup:

begin
  dbms_aqadm.create_sharded_queue(
    queue_name => 'poc_queue_source.nps_transactions_shdqueue',
    multiple_consumers => FALSE,
    queue_payload_type => DBMS_AQADM.JMS_TYPE
  );
  dbms_aqadm.start_queue(
    queue_name => 'poc_queue_source.nps_transactions_shdqueue'
  );
end;
/

PL/SQL is exactly the same except the queue name is changed:

declare
  row_json            varchar2(4000);
  enqueue_options     dbms_aq.enqueue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      RAW(16);
  message             SYS.AQ$_JMS_TEXT_MESSAGE := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
begin
  row_json := json_object(
    'transactionId'  value  :NEW.transactionid,
    'srcaccountid'  value  :NEW.srcaccountid,
    'dstaccountid'  value  :NEW.dstaccountid
    -- More stuff here
    null on null
  );
  message.set_text(row_json);
  dbms_aq.enqueue(
    queue_name => 'poc_queue_source.nps_transactions_queue_shdqueue',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => message,
    msgid => message_handle
  );
end;

And in the JMS app the queue name of the Destination is updated.

I see messages appear in the Queue table in the database, but the MessageConsumer.receive call within org.springframework.jms.support.destination.JmsDestinationAccessor#receiveFromConsumer receives no Messages.

Is there something wrong with my configuration and setup? I see the queue table structure between Non-Sharded and Sharded Queues are different.

本文标签: javaJMS application unable to consume from Oracle Sharded QueueStack Overflow