admin管理员组

文章数量:1415698

We want to commit JMS read transactions based on a property in the middle of our flow. Our implementation currently uses an executor to achieve this.

   @Bean
  public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
    return containerSpec -> {
      containerSpec.receiveTimeout(20_000L);
      containerSpec.maxConcurrentConsumers(1);
      containerSpec.sessionTransacted(true);
    };
  }
  
  @Bean
  public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
      @Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
    return channels -> channels.executor(jmsTaskExecutor);
  }

  @Bean(name = "jmsTaskExecutor")
  @ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "true")
  public Executor jmsTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    // setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
    // where no tasks are queued and a new thread is created as needed if none
    // are available in the cache
    taskExecutor.setQueueCapacity(0);
    return taskExecutor;
  }

  @Bean(name = "jmsTaskExecutor")
  @ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "false", matchIfMissing = true)
  public Executor synchronousJmsTaskExecutor() {
    SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
    return taskExecutor;
  }
  
@Bean
  public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
    return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
  }
  
@Bean
  public IntegrationFlow jmsMessageFlow(
      @Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
      Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
  return IntegrationFlow.from(
            Jms.messageDrivenChannelAdapter(connectionFactory)
                .destination("INCOMING_QUEUE")
                .configureListenerContainer(
                    jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
                .errorChannel(genericExceptionChannel)
                .outputChannel("messageHandlingChannel"))
       // save message in db
       .handle(
            (payload, headers) -> databaseService.save(payload),
            spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
        // new thread so that the jms message is acknowledged
        .channel(jmsTxCommitingChannelSpec)
        .enrichHeaders(errorChannelSpec)
        .handle(
            (payload, headers) -> messageParser.extractMessageMetadata(payload),
            spec -> spec.id("extractMessageMetadata"))
        .handle(
            (payload, headers) -> databaseService.update(payload))            
        .handle(Jms.outboundAdapter(connectionFactory)
                .destination(getQueueName())
                .configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
        .get();
}

Are there are issues using a SyncTaskExecutor here whenever the property appmit-jms-reads-early is not set.

From Spring Documentation

SyncTaskExecutor: This implementation does not run invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading is not necessary, such as in simple test cases.

We want to commit JMS read transactions based on a property in the middle of our flow. Our implementation currently uses an executor to achieve this.

   @Bean
  public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
    return containerSpec -> {
      containerSpec.receiveTimeout(20_000L);
      containerSpec.maxConcurrentConsumers(1);
      containerSpec.sessionTransacted(true);
    };
  }
  
  @Bean
  public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
      @Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
    return channels -> channels.executor(jmsTaskExecutor);
  }

  @Bean(name = "jmsTaskExecutor")
  @ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "true")
  public Executor jmsTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    // setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
    // where no tasks are queued and a new thread is created as needed if none
    // are available in the cache
    taskExecutor.setQueueCapacity(0);
    return taskExecutor;
  }

  @Bean(name = "jmsTaskExecutor")
  @ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "false", matchIfMissing = true)
  public Executor synchronousJmsTaskExecutor() {
    SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
    return taskExecutor;
  }
  
@Bean
  public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
    return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
  }
  
@Bean
  public IntegrationFlow jmsMessageFlow(
      @Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
      Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
  return IntegrationFlow.from(
            Jms.messageDrivenChannelAdapter(connectionFactory)
                .destination("INCOMING_QUEUE")
                .configureListenerContainer(
                    jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
                .errorChannel(genericExceptionChannel)
                .outputChannel("messageHandlingChannel"))
       // save message in db
       .handle(
            (payload, headers) -> databaseService.save(payload),
            spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
        // new thread so that the jms message is acknowledged
        .channel(jmsTxCommitingChannelSpec)
        .enrichHeaders(errorChannelSpec)
        .handle(
            (payload, headers) -> messageParser.extractMessageMetadata(payload),
            spec -> spec.id("extractMessageMetadata"))
        .handle(
            (payload, headers) -> databaseService.update(payload))            
        .handle(Jms.outboundAdapter(connectionFactory)
                .destination(getQueueName())
                .configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
        .get();
}

Are there are issues using a SyncTaskExecutor here whenever the property appmit-jms-reads-early is not set.

From Spring Documentation

SyncTaskExecutor: This implementation does not run invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading is not necessary, such as in simple test cases.

Share Improve this question asked Feb 4 at 19:13 VPN236VPN236 273 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

You are right. There is indeed an issue with that SyncTaskExecutor for your use-case.

See its implementation:

@Override
public void execute(Runnable task) {
    Assert.notNull(task, "Runnable must not be null");
    task.run();
}

unlike something like SimpleAsyncTaskExecutor:

protected void doExecute(Runnable task) {
    newThread(task).start();
}

So, the problem that with this SyncTaskExecutor logic, your transaction is not going to be committed because you just don't leave that JMS thread and perform the rest of the flow exactly in this one.

本文标签: