admin管理员组

文章数量:1123781

We are using Spring Integration 6.3.3 Aggregator pattern, with JDBCMessageStore, and expiry timeout. We want to also send the partial group on expiry, which we consider as an error condition. We want to log/alert when this error happens.

What is the best way for the downstream component in the IntegrationFlow, when it receives the aggregated messages, to know if the messages are a complete group or partial?

Here's a simplified example:

    @Autowired
    PetGroupHandler petGroupHandler;

    @Bean
    public JdbcMessageStore jdbcMessageStore(
            @Qualifier("mydbDataSource") DataSource dataSource) {
        JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
        messageStore.setRegion("petstore-pubsub");
        return messageStore;
    }

    @Bean
    IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
        return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
                .filter(petOfInterestFilter, "shouldProcess")
                .aggregate(aggregatorSpec -> aggregatorSpec
                        .messageStore(jdbcMessageStore)
                        .outputProcessor(petOutputProcessor)
                        .expireGroupsUponCompletion(true) 
                        .groupTimeout(300 * 1000) // 5 minutes
                        .sendPartialResultOnExpiry(true) // send partial group
                        .correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
                        .releaseStrategy(group -> group.size()>=2))
                .handle(petGroupHandler, "handle")
                .get();

When the PetGroupHandler in the above example receives a list of Pet, is there a way to know if the list of Pet is a complete group or partial, so the handler can do something different for complete vs partial group?

We tried implementing an OutputProcessor that injects a boolean header, indicating if the group is complete. But this does not work because it looks like the group is not set to complete in AbstractCorrelatingMessageHandler.java, right after the release strategy is called and before the OutputProcessor is called in the completeGroup() method.

            if (this.releaseStrategy.canRelease(messageGroup)) {
                Collection<Message<?>> completedMessages = null;
                try {
                    noOutput = false;
                    completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
                }
                finally {
                    // Possible clean (implementation dependency) up
                    // even if there was an exception processing messages
                    afterRelease(messageGroup, completedMessages);
                }
                if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
                    removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
                }
            }

.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L570-L584

In the above example the release strategy is quite simple. You could argue I could just check the list size. But in our real application, the release strategy is more complex. I would like to avoid our handler knowing anything about the release strategy.

I kinda wish the .sendPartialResultOnExpiry() has a channel or a handler parameter, to which the partial group is sent. But there isn't a way to specify that.

Any suggestion on how to do this is appreciated! Thanks!

We are using Spring Integration 6.3.3 Aggregator pattern, with JDBCMessageStore, and expiry timeout. We want to also send the partial group on expiry, which we consider as an error condition. We want to log/alert when this error happens.

What is the best way for the downstream component in the IntegrationFlow, when it receives the aggregated messages, to know if the messages are a complete group or partial?

Here's a simplified example:

    @Autowired
    PetGroupHandler petGroupHandler;

    @Bean
    public JdbcMessageStore jdbcMessageStore(
            @Qualifier("mydbDataSource") DataSource dataSource) {
        JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
        messageStore.setRegion("petstore-pubsub");
        return messageStore;
    }

    @Bean
    IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
        return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
                .filter(petOfInterestFilter, "shouldProcess")
                .aggregate(aggregatorSpec -> aggregatorSpec
                        .messageStore(jdbcMessageStore)
                        .outputProcessor(petOutputProcessor)
                        .expireGroupsUponCompletion(true) 
                        .groupTimeout(300 * 1000) // 5 minutes
                        .sendPartialResultOnExpiry(true) // send partial group
                        .correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
                        .releaseStrategy(group -> group.size()>=2))
                .handle(petGroupHandler, "handle")
                .get();

When the PetGroupHandler in the above example receives a list of Pet, is there a way to know if the list of Pet is a complete group or partial, so the handler can do something different for complete vs partial group?

We tried implementing an OutputProcessor that injects a boolean header, indicating if the group is complete. But this does not work because it looks like the group is not set to complete in AbstractCorrelatingMessageHandler.java, right after the release strategy is called and before the OutputProcessor is called in the completeGroup() method.

            if (this.releaseStrategy.canRelease(messageGroup)) {
                Collection<Message<?>> completedMessages = null;
                try {
                    noOutput = false;
                    completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
                }
                finally {
                    // Possible clean (implementation dependency) up
                    // even if there was an exception processing messages
                    afterRelease(messageGroup, completedMessages);
                }
                if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
                    removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
                }
            }

https://github.com/spring-projects/spring-integration/blob/6.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L570-L584

In the above example the release strategy is quite simple. You could argue I could just check the list size. But in our real application, the release strategy is more complex. I would like to avoid our handler knowing anything about the release strategy.

I kinda wish the .sendPartialResultOnExpiry() has a channel or a handler parameter, to which the partial group is sent. But there isn't a way to specify that.

Any suggestion on how to do this is appreciated! Thanks!

Share Improve this question asked yesterday Shinta SmithShinta Smith 4987 silver badges20 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

A bit further in the AbstractCorrelatingMessageHandler there is a logic:

protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) {
    this.logger.debug(() -> "Expiring MessageGroup with correlationKey[" + correlationKey + "]");
    if (this.sendPartialResultOnExpiry) {
        this.logger.debug(() -> "Prematurely releasing partially complete group with key ["
                + correlationKey + "] to: " + getOutputChannel());
        completeGroup(correlationKey, group, lock);
    }
    else {
        this.logger.debug(() -> "Discarding messages of partially complete group with key ["
                + correlationKey + "] to: "
                + (this.discardChannelName != null ? this.discardChannelName : this.discardChannel));
        if (this.releaseLockBeforeSend) {
            lock.unlock();
        }
        group.getMessages()
                .forEach(this::discardMessage);
    }
    if (this.applicationEventPublisher != null) {
        this.applicationEventPublisher.publishEvent(
                new MessageGroupExpiredEvent(this, correlationKey, group.size(),
                        new Date(group.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
    }
}

So, if sendPartialResultOnExpiry(false) (default), all those messages in partially complete group are going to be discarded. See docs for more info: https://docs.spring.io/spring-integration/reference/aggregator.html#aggregator-xml.

Pay attention to bullet #8:

Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long)). One way of expiring a MessageGroup is by configuring a MessageGroupStoreReaper. However, you can alternatively expire MessageGroup by calling MessageGroupStore.expireMessageGroups(timeout). You can accomplish that through a Control Bus operation or, if you have a reference to the MessageGroupStore instance, by invoking expireMessageGroups(timeout). Otherwise, by itself, this attribute does nothing. It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in the MessageGroup that is about to be expired. Optional (the default is false). NOTE: This attribute might more properly be called send-partial-result-on-timeout, because the group may not actually expire if expire-groups-upon-timeout is set to false.

本文标签: