admin管理员组文章数量:1123781
We are using Spring Integration 6.3.3 Aggregator pattern, with JDBCMessageStore, and expiry timeout. We want to also send the partial group on expiry, which we consider as an error condition. We want to log/alert when this error happens.
What is the best way for the downstream component in the IntegrationFlow
, when it receives the aggregated messages, to know if the messages are a complete group or partial?
Here's a simplified example:
@Autowired
PetGroupHandler petGroupHandler;
@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}
@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
When the PetGroupHandler
in the above example receives a list of Pet
, is there a way to know if the list of Pet
is a complete group or partial, so the handler can do something different for complete vs partial group?
We tried implementing an OutputProcessor
that injects a boolean header, indicating if the group is complete. But this does not work because it looks like the group is not set to complete in AbstractCorrelatingMessageHandler.java
, right after the release strategy is called and before the OutputProcessor
is called in the completeGroup()
method.
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection<Message<?>> completedMessages = null;
try {
noOutput = false;
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
}
finally {
// Possible clean (implementation dependency) up
// even if there was an exception processing messages
afterRelease(messageGroup, completedMessages);
}
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
}
}
.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L570-L584
In the above example the release strategy is quite simple. You could argue I could just check the list size. But in our real application, the release strategy is more complex. I would like to avoid our handler knowing anything about the release strategy.
I kinda wish the .sendPartialResultOnExpiry()
has a channel or a handler parameter, to which the partial group is sent. But there isn't a way to specify that.
Any suggestion on how to do this is appreciated! Thanks!
We are using Spring Integration 6.3.3 Aggregator pattern, with JDBCMessageStore, and expiry timeout. We want to also send the partial group on expiry, which we consider as an error condition. We want to log/alert when this error happens.
What is the best way for the downstream component in the IntegrationFlow
, when it receives the aggregated messages, to know if the messages are a complete group or partial?
Here's a simplified example:
@Autowired
PetGroupHandler petGroupHandler;
@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}
@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
When the PetGroupHandler
in the above example receives a list of Pet
, is there a way to know if the list of Pet
is a complete group or partial, so the handler can do something different for complete vs partial group?
We tried implementing an OutputProcessor
that injects a boolean header, indicating if the group is complete. But this does not work because it looks like the group is not set to complete in AbstractCorrelatingMessageHandler.java
, right after the release strategy is called and before the OutputProcessor
is called in the completeGroup()
method.
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection<Message<?>> completedMessages = null;
try {
noOutput = false;
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
}
finally {
// Possible clean (implementation dependency) up
// even if there was an exception processing messages
afterRelease(messageGroup, completedMessages);
}
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
}
}
https://github.com/spring-projects/spring-integration/blob/6.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L570-L584
In the above example the release strategy is quite simple. You could argue I could just check the list size. But in our real application, the release strategy is more complex. I would like to avoid our handler knowing anything about the release strategy.
I kinda wish the .sendPartialResultOnExpiry()
has a channel or a handler parameter, to which the partial group is sent. But there isn't a way to specify that.
Any suggestion on how to do this is appreciated! Thanks!
Share Improve this question asked yesterday Shinta SmithShinta Smith 4987 silver badges20 bronze badges1 Answer
Reset to default 0A bit further in the AbstractCorrelatingMessageHandler
there is a logic:
protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) {
this.logger.debug(() -> "Expiring MessageGroup with correlationKey[" + correlationKey + "]");
if (this.sendPartialResultOnExpiry) {
this.logger.debug(() -> "Prematurely releasing partially complete group with key ["
+ correlationKey + "] to: " + getOutputChannel());
completeGroup(correlationKey, group, lock);
}
else {
this.logger.debug(() -> "Discarding messages of partially complete group with key ["
+ correlationKey + "] to: "
+ (this.discardChannelName != null ? this.discardChannelName : this.discardChannel));
if (this.releaseLockBeforeSend) {
lock.unlock();
}
group.getMessages()
.forEach(this::discardMessage);
}
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(
new MessageGroupExpiredEvent(this, correlationKey, group.size(),
new Date(group.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
}
}
So, if sendPartialResultOnExpiry(false)
(default), all those messages in partially complete group are going to be discarded.
See docs for more info: https://docs.spring.io/spring-integration/reference/aggregator.html#aggregator-xml.
Pay attention to bullet #8:
Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long)). One way of expiring a MessageGroup is by configuring a MessageGroupStoreReaper. However, you can alternatively expire MessageGroup by calling MessageGroupStore.expireMessageGroups(timeout). You can accomplish that through a Control Bus operation or, if you have a reference to the MessageGroupStore instance, by invoking expireMessageGroups(timeout). Otherwise, by itself, this attribute does nothing. It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in the MessageGroup that is about to be expired. Optional (the default is false). NOTE: This attribute might more properly be called send-partial-result-on-timeout, because the group may not actually expire if expire-groups-upon-timeout is set to false.
本文标签:
版权声明:本文标题:java - Spring Integration: Aggregator pattern - a way for downstream component to know if group is complete - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736597389a1945169.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论