admin管理员组文章数量:1415698
We want to commit JMS read transactions based on a property in the middle of our flow. Our implementation currently uses an executor to achieve this.
@Bean
public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
return containerSpec -> {
containerSpec.receiveTimeout(20_000L);
containerSpec.maxConcurrentConsumers(1);
containerSpec.sessionTransacted(true);
};
}
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
@Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
return channels -> channels.executor(jmsTaskExecutor);
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "true")
public Executor jmsTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
// where no tasks are queued and a new thread is created as needed if none
// are available in the cache
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "false", matchIfMissing = true)
public Executor synchronousJmsTaskExecutor() {
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
return taskExecutor;
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.handle(
(payload, headers) -> databaseService.update(payload))
.handle(Jms.outboundAdapter(connectionFactory)
.destination(getQueueName())
.configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
.get();
}
Are there are issues using a SyncTaskExecutor
here whenever the property appmit-jms-reads-early
is not set.
From Spring Documentation
SyncTaskExecutor: This implementation does not run invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading is not necessary, such as in simple test cases.
We want to commit JMS read transactions based on a property in the middle of our flow. Our implementation currently uses an executor to achieve this.
@Bean
public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
return containerSpec -> {
containerSpec.receiveTimeout(20_000L);
containerSpec.maxConcurrentConsumers(1);
containerSpec.sessionTransacted(true);
};
}
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
@Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
return channels -> channels.executor(jmsTaskExecutor);
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "true")
public Executor jmsTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
// where no tasks are queued and a new thread is created as needed if none
// are available in the cache
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "false", matchIfMissing = true)
public Executor synchronousJmsTaskExecutor() {
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
return taskExecutor;
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.handle(
(payload, headers) -> databaseService.update(payload))
.handle(Jms.outboundAdapter(connectionFactory)
.destination(getQueueName())
.configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
.get();
}
Are there are issues using a SyncTaskExecutor
here whenever the property appmit-jms-reads-early
is not set.
From Spring Documentation
SyncTaskExecutor: This implementation does not run invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading is not necessary, such as in simple test cases.
Share Improve this question asked Feb 4 at 19:13 VPN236VPN236 273 bronze badges1 Answer
Reset to default 0You are right. There is indeed an issue with that SyncTaskExecutor
for your use-case.
See its implementation:
@Override
public void execute(Runnable task) {
Assert.notNull(task, "Runnable must not be null");
task.run();
}
unlike something like SimpleAsyncTaskExecutor
:
protected void doExecute(Runnable task) {
newThread(task).start();
}
So, the problem that with this SyncTaskExecutor
logic, your transaction is not going to be committed because you just don't leave that JMS thread and perform the rest of the flow exactly in this one.
本文标签:
版权声明:本文标题:In a spring integration flow is using a SyncTaskExecutor the same as using a single threaded flow? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745237963a2649150.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论