Skip to content

Commit 65198f0

Browse files
committed
* Add Java DSL and XML support for expireTimeout and expireDuration options
* Document the new feature
1 parent 08dc0a3 commit 65198f0

File tree

10 files changed

+110
-44
lines changed

10 files changed

+110
-44
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@
8585
* {@link org.springframework.integration.store.MessageStore} is used
8686
* for multiple handlers to ensure uniqueness of message groups across handlers.
8787
* <p>
88-
* When the {@link #expireTimeout} is more than 0, the groups which older then this timeout
89-
* is purged from the store on start up (or when {@link #purgeOrphanedGroups()} is called).
90-
* If {@link #expireDuration} is provide, the task is scheduled to perform
88+
* When the {@link #expireTimeout} is greater than 0, groups which are older than this timeout
89+
* are purged from the store on start up (or when {@link #purgeOrphanedGroups()} is called).
90+
* If {@link #expireDuration} is provided, the task is scheduled to perform
9191
* {@link #purgeOrphanedGroups()} periodically.
9292
*
9393
* @author Iwein Fuld
@@ -324,10 +324,11 @@ public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) {
324324

325325
/**
326326
* Configure a timeout in milliseconds for purging old orphaned groups from the store.
327-
* Used on startup and if {@link #expireDuration} is provided the task for {@link #purgeOrphanedGroups()}
328-
* is scheduled with that period.
327+
* Used on startup and when an {@link #expireDuration} is provided, the task for running
328+
* {@link #purgeOrphanedGroups()} is scheduled with that period.
329329
* The {@link #forceReleaseProcessor} is used to process those expired groups according
330-
* the "force complete" options.
330+
* the "force complete" options. A group can be orphaned if a persistent message group
331+
* store is used and no new messages arrive for that group after a restart.
331332
* @param expireTimeout the number of milliseconds to determine old orphaned groups in the store to purge.
332333
* @since 5.4
333334
* @see #purgeOrphanedGroups()
@@ -337,6 +338,18 @@ public void setExpireTimeout(long expireTimeout) {
337338
this.expireTimeout = expireTimeout;
338339
}
339340

341+
/**
342+
* Configure a {@link Duration} (ini millis) how often to clean up old orphaned groups from the store.
343+
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
344+
* @since 5.4
345+
* @see #purgeOrphanedGroups()
346+
* @see #setExpireDuration(Duration)
347+
* @see #setExpireTimeout(long)
348+
*/
349+
public void setExpireDurationMillis(long expireDuration) {
350+
setExpireDuration(Duration.ofMillis(expireDuration));
351+
}
352+
340353
/**
341354
* Configure a {@link Duration} how often to clean up old orphaned groups from the store.
342355
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
@@ -969,7 +982,7 @@ public boolean isRunning() {
969982
* @since 5.4
970983
*/
971984
public void purgeOrphanedGroups() {
972-
Assert.isTrue(expireTimeout > 0, "'expireTimeout' must be more than 0.");
985+
Assert.isTrue(this.expireTimeout > 0, "'expireTimeout' must be more than 0.");
973986
this.messageStore.expireMessageGroups(this.expireTimeout);
974987
}
975988

spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.config;
1818

19+
import java.time.Duration;
1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.function.Function;
@@ -62,13 +63,6 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
6263

6364
private String outputChannelName;
6465

65-
@SuppressWarnings("deprecation")
66-
private org.springframework.integration.support.management.AbstractMessageHandlerMetrics metrics;
67-
68-
private Boolean statsEnabled;
69-
70-
private Boolean countsEnabled;
71-
7266
private LockRegistry lockRegistry;
7367

7468
private MessageGroupStore messageStore;
@@ -97,6 +91,10 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
9791

9892
private Boolean releaseLockBeforeSend;
9993

94+
private Long expireTimeout;
95+
96+
private Long expireDuration;
97+
10098
private Function<MessageGroup, Map<String, Object>> headersFunction;
10199

102100
public void setProcessorBean(Object processorBean) {
@@ -120,25 +118,6 @@ public void setOutputChannelName(String outputChannelName) {
120118
this.outputChannelName = outputChannelName;
121119
}
122120

123-
/**
124-
* Deprecated.
125-
* @param metrics the metrics.
126-
* @deprecated in favor of Micrometer metrics.
127-
*/
128-
@Deprecated
129-
@SuppressWarnings("deprecation")
130-
public void setMetrics(org.springframework.integration.support.management.AbstractMessageHandlerMetrics metrics) {
131-
this.metrics = metrics;
132-
}
133-
134-
public void setStatsEnabled(Boolean statsEnabled) {
135-
this.statsEnabled = statsEnabled;
136-
}
137-
138-
public void setCountsEnabled(Boolean countsEnabled) {
139-
this.countsEnabled = countsEnabled;
140-
}
141-
142121
public void setLockRegistry(LockRegistry lockRegistry) {
143122
this.lockRegistry = lockRegistry;
144123
}
@@ -199,7 +178,14 @@ public void setHeadersFunction(Function<MessageGroup, Map<String, Object>> heade
199178
this.headersFunction = headersFunction;
200179
}
201180

202-
@SuppressWarnings("deprecation")
181+
public void setExpireTimeout(Long expireTimeout) {
182+
this.expireTimeout = expireTimeout;
183+
}
184+
185+
public void setExpireDurationMillis(Long expireDuration) {
186+
this.expireDuration = expireDuration;
187+
}
188+
203189
@Override
204190
protected AggregatingMessageHandler createHandler() {
205191
MessageGroupProcessor outputProcessor;
@@ -229,9 +215,6 @@ protected AggregatingMessageHandler createHandler() {
229215
.acceptIfNotNull(this.expireGroupsUponCompletion, aggregator::setExpireGroupsUponCompletion)
230216
.acceptIfNotNull(this.sendTimeout, aggregator::setSendTimeout)
231217
.acceptIfNotNull(this.outputChannelName, aggregator::setOutputChannelName)
232-
.acceptIfNotNull(this.metrics, aggregator::configureMetrics)
233-
.acceptIfNotNull(this.statsEnabled, aggregator::setStatsEnabled)
234-
.acceptIfNotNull(this.countsEnabled, aggregator::setCountsEnabled)
235218
.acceptIfNotNull(this.lockRegistry, aggregator::setLockRegistry)
236219
.acceptIfNotNull(this.messageStore, aggregator::setMessageStore)
237220
.acceptIfNotNull(this.correlationStrategy, aggregator::setCorrelationStrategy)
@@ -245,7 +228,10 @@ protected AggregatingMessageHandler createHandler() {
245228
.acceptIfNotNull(this.minimumTimeoutForEmptyGroups, aggregator::setMinimumTimeoutForEmptyGroups)
246229
.acceptIfNotNull(this.expireGroupsUponTimeout, aggregator::setExpireGroupsUponTimeout)
247230
.acceptIfNotNull(this.popSequence, aggregator::setPopSequence)
248-
.acceptIfNotNull(this.releaseLockBeforeSend, aggregator::setReleaseLockBeforeSend);
231+
.acceptIfNotNull(this.releaseLockBeforeSend, aggregator::setReleaseLockBeforeSend)
232+
.acceptIfNotNull(this.expireDuration,
233+
(duration) -> aggregator.setExpireDuration(Duration.ofMillis(duration)))
234+
.acceptIfNotNull(this.expireTimeout, aggregator::setExpireTimeout);
249235

250236
return aggregator;
251237
}

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,6 +101,9 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
101101

102102
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);
103103
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, RELEASE_LOCK);
104+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expire-timeout");
105+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expire-duration",
106+
"expireDurationMillis");
104107
}
105108

106109
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.time.Duration;
1920
import java.util.Arrays;
2021
import java.util.LinkedList;
2122
import java.util.List;
@@ -329,4 +330,28 @@ public S popSequence(boolean popSequence) {
329330
return _this();
330331
}
331332

333+
/**
334+
* Configure a timeout for old groups in the store to purge.
335+
* @param expireTimeout the timeout in milliseconds to use.
336+
* @return the endpoint spec.
337+
* @since 5.4
338+
* @see AbstractCorrelatingMessageHandler#setExpireTimeout(long)
339+
*/
340+
public S setExpireTimeout(long expireTimeout) {
341+
this.handler.setExpireTimeout(expireTimeout);
342+
return _this();
343+
}
344+
345+
/**
346+
* Configure a {@link Duration} how often to run a scheduled purge task.
347+
* @param expireDuration the duration for scheduled purge task.
348+
* @return the endpoint spec.
349+
* @since 5.4
350+
* @see AbstractCorrelatingMessageHandler#setExpireDuration(Duration)
351+
*/
352+
public S setExpireDuration(Duration expireDuration) {
353+
this.handler.setExpireDuration(expireDuration);
354+
return _this();
355+
}
356+
332357
}

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4037,6 +4037,22 @@
40374037
<xsd:union memberTypes="xsd:boolean xsd:string"/>
40384038
</xsd:simpleType>
40394039
</xsd:attribute>
4040+
<xsd:attribute name="expire-timeout" type="xsd:string">
4041+
<xsd:annotation>
4042+
<xsd:documentation>
4043+
The timeout for old groups in the store to purge on startup.
4044+
If 'expire-duration' is also provided, the purge task is scheduled
4045+
periodically to purge old groups with this expiration timeout.
4046+
</xsd:documentation>
4047+
</xsd:annotation>
4048+
</xsd:attribute>
4049+
<xsd:attribute name="expire-duration" type="xsd:string">
4050+
<xsd:annotation>
4051+
<xsd:documentation>
4052+
The 'Duration' (in milliseconds) how often to run purge old groups scheduled task.
4053+
</xsd:documentation>
4054+
</xsd:annotation>
4055+
</xsd:attribute>
40404056
</xsd:extension>
40414057
</xsd:complexContent>
40424058
</xsd:complexType>

spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests-context.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@
5656
scheduler="scheduler"
5757
message-store="store"
5858
pop-sequence="false"
59-
order="5">
59+
order="5"
60+
expire-duration="10000"
61+
expire-timeout="250">
6062
<expire-transactional/>
6163
</aggregator>
6264

spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,13 +19,13 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.fail;
2121

22+
import java.time.Duration;
2223
import java.util.ArrayList;
2324
import java.util.Collection;
2425
import java.util.List;
2526
import java.util.concurrent.atomic.AtomicReference;
2627

27-
import org.junit.Test;
28-
import org.junit.runner.RunWith;
28+
import org.junit.jupiter.api.Test;
2929

3030
import org.springframework.beans.DirectFieldAccessor;
3131
import org.springframework.beans.factory.BeanCreationException;
@@ -54,6 +54,7 @@
5454
import org.springframework.messaging.MessageChannel;
5555
import org.springframework.messaging.PollableChannel;
5656
import org.springframework.messaging.SubscribableChannel;
57+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5758
import org.springframework.test.context.junit4.SpringRunner;
5859

5960
/**
@@ -65,7 +66,7 @@
6566
* @author Gunnar Hillert
6667
* @author Gary Russell
6768
*/
68-
@RunWith(SpringRunner.class)
69+
@SpringJUnitConfig
6970
public class AggregatorParserTests {
7071

7172
@Autowired
@@ -194,6 +195,9 @@ public void testPropertyAssignment() {
194195
assertThat(TestUtils.getPropertyValue(consumer, "order")).isEqualTo(5);
195196
assertThat(TestUtils.getPropertyValue(consumer, "forceReleaseAdviceChain")).isNotNull();
196197
assertThat(TestUtils.getPropertyValue(consumer, "popSequence", Boolean.class)).isFalse();
198+
assertThat(TestUtils.getPropertyValue(consumer, "expireTimeout")).isEqualTo(250L);
199+
assertThat(TestUtils.getPropertyValue(consumer, "expireDuration", Duration.class))
200+
.isEqualTo(Duration.ofSeconds(10));
197201
}
198202

199203
@Test

src/reference/asciidoc/aggregator.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ Only this sub-element or `<expire-transactional/>` is allowed.
513513
A transaction `Advice` can also be configured here by using the Spring `tx` namespace.
514514
====
515515
516+
[[aggregator-expiring-groups]]
516517
[IMPORTANT]
517518
.Expiring Groups
518519
=====
@@ -535,6 +536,17 @@ Timed-out groups are either discarded or a partial release occurs (based on `sen
535536
536537
Since version 5.0, empty groups are also scheduled for removal after `empty-group-min-timeout`.
537538
If `expireGroupsUponCompletion == false` and `minimumTimeoutForEmptyGroups > 0`, the task to remove the group is scheduled when normal or partial sequences release happens.
539+
540+
Starting version 5.4, the aggregator (and resequencer) can be configured for so-called "embedded reaper".
541+
The `expireTimeout` (if greater than `0`) indicates how old groups in the store should be purged.
542+
The `purgeOrphanedGroups()` method is called on start up and together with the provided `expireDuration` periodically within a scheduled task.
543+
This method is also can be called externally at any time.
544+
The expiration logic is fully delegated into a `forceComplete(MessageGroup)` functionality according the provided expiration options mentioned above.
545+
Such a periodic purge functionality is useful when a message store is needed to be cleaned up from those old groups which are not going to be released any more with regular message arrival logic.
546+
In most cases this happens after an application start up.
547+
The functionality is similar to the `MessageGroupStoreReaper` with a scheduled task, but provides a convenient way to deal with old groups withing specific components.
548+
The `MessageGroupStore` is still have to be provided exclusively for the current correlation endpoint.
549+
Otherwise one aggregator may purge groups from another.
538550
=====
539551
540552
We generally recommend using a `ref` attribute if a custom aggregator handler implementation may be referenced in other `<aggregator>` definitions.

src/reference/asciidoc/resequencer.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,6 @@ Starting with version 5.0, empty groups are also scheduled for removal after the
121121
The default is 'false'.
122122
====
123123

124+
Also see <<./aggregator.adoc#aggregator-expiring-groups, Aggregator Expiring Groups>> for more information.
125+
124126
NOTE: Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it.

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ See <<./ip.adoc#ip-collaborating-adapters,Collaborating Channel Adapters>> and <
5050

5151
The one-way messaging gateway (the `void` method return type) now sets a `nullChannel` explicitly into the `replyChannel` header to ignore any possible downstream replies.
5252
See <<./gateway.adoc#gateway-default-reply-channel,Setting the Default Reply Channel>> for more information.
53+
54+
The aggregator (and resequencer) has now an "embedded reaper" support functionality provided via an `expireTimeout` and `expireDuration` options.
55+
See <<./aggregator.adoc#aggregator-expiring-groups, Aggregator Expiring Groups>> for more information.

0 commit comments

Comments
 (0)