Skip to content

Commit 492f0bf

Browse files
GH-3334: Add "embedded reaper" into CorrelationMH (#3342)
* GH-3334: Add "embedded reaper" into CorrelationMH Fixes #3334 * Add `expireTimeout` property into `AbstractCorrelatingMessageHandler` to call newly introduced `purgeOrphanedGroups()` API for removing old groups from the store * Add `expireDuration` to perform `purgeOrphanedGroups()` task periodically * * Add Java DSL and XML support for `expireTimeout` and `expireDuration` options * Document the new feature * * Fix language in docs Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent 6a865b5 commit 492f0bf

File tree

11 files changed

+214
-47
lines changed

11 files changed

+214
-47
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.aggregator;
1818

19+
import java.time.Duration;
1920
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.Comparator;
@@ -53,6 +54,7 @@
5354
import org.springframework.integration.support.locks.DefaultLockRegistry;
5455
import org.springframework.integration.support.locks.LockRegistry;
5556
import org.springframework.integration.util.UUIDConverter;
57+
import org.springframework.lang.Nullable;
5658
import org.springframework.messaging.Message;
5759
import org.springframework.messaging.MessageChannel;
5860
import org.springframework.messaging.MessageDeliveryException;
@@ -82,6 +84,11 @@
8284
* Use proper {@link CorrelationStrategy} for cases when same
8385
* {@link org.springframework.integration.store.MessageStore} is used
8486
* for multiple handlers to ensure uniqueness of message groups across handlers.
87+
* <p>
88+
* When the {@link #expireTimeout} is greater than 0, groups which are older than this timeout
89+
* are purged from the store on start up (or when {@link #purgeOrphanedGroups()} is called).
90+
* If {@link #expireDuration} is provided, the task is scheduled to perform
91+
* {@link #purgeOrphanedGroups()} periodically.
8592
*
8693
* @author Iwein Fuld
8794
* @author Dave Syer
@@ -132,6 +139,11 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
132139

133140
private List<Advice> forceReleaseAdviceChain;
134141

142+
private long expireTimeout;
143+
144+
@Nullable
145+
private Duration expireDuration;
146+
135147
private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
136148

137149
private EvaluationContext evaluationContext;
@@ -310,6 +322,45 @@ public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) {
310322
this.releaseLockBeforeSend = releaseLockBeforeSend;
311323
}
312324

325+
/**
326+
* Configure a timeout in milliseconds for purging old orphaned groups from the store.
327+
* Used on startup and when an {@link #expireDuration} is provided, the task for running
328+
* {@link #purgeOrphanedGroups()} is scheduled with that period.
329+
* The {@link #forceReleaseProcessor} is used to process those expired groups according
330+
* the "force complete" options. A group can be orphaned if a persistent message group
331+
* store is used and no new messages arrive for that group after a restart.
332+
* @param expireTimeout the number of milliseconds to determine old orphaned groups in the store to purge.
333+
* @since 5.4
334+
* @see #purgeOrphanedGroups()
335+
*/
336+
public void setExpireTimeout(long expireTimeout) {
337+
Assert.isTrue(expireTimeout > 0, "'expireTimeout' must be more than 0.");
338+
this.expireTimeout = expireTimeout;
339+
}
340+
341+
/**
342+
* Configure a {@link Duration} (in millis) how often to clean up old orphaned groups from the store.
343+
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
344+
* @since 5.4
345+
* @see #purgeOrphanedGroups()
346+
* @see #setExpireDuration(Duration)
347+
* @see #setExpireTimeout(long)
348+
*/
349+
public void setExpireDurationMillis(long expireDuration) {
350+
setExpireDuration(Duration.ofMillis(expireDuration));
351+
}
352+
353+
/**
354+
* Configure a {@link Duration} how often to clean up old orphaned groups from the store.
355+
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
356+
* @since 5.4
357+
* @see #purgeOrphanedGroups()
358+
* @see #setExpireTimeout(long)
359+
*/
360+
public void setExpireDuration(@Nullable Duration expireDuration) {
361+
this.expireDuration = expireDuration;
362+
}
363+
313364
@Override
314365
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
315366
this.applicationEventPublisher = applicationEventPublisher;
@@ -896,6 +947,13 @@ public void start() {
896947
if (this.releaseStrategy instanceof Lifecycle) {
897948
((Lifecycle) this.releaseStrategy).start();
898949
}
950+
if (this.expireTimeout > 0) {
951+
purgeOrphanedGroups();
952+
if (this.expireDuration != null) {
953+
getTaskScheduler()
954+
.scheduleWithFixedDelay(this::purgeOrphanedGroups, this.expireDuration);
955+
}
956+
}
899957
}
900958
}
901959

@@ -917,6 +975,17 @@ public boolean isRunning() {
917975
return this.running;
918976
}
919977

978+
/**
979+
* Perform a {@link MessageGroupStore#expireMessageGroups(long)} with the provided {@link #expireTimeout}.
980+
* Can be called externally at any time.
981+
* Internally it is called from the scheduled task with the configured {@link #expireDuration}.
982+
* @since 5.4
983+
*/
984+
public void purgeOrphanedGroups() {
985+
Assert.isTrue(this.expireTimeout > 0, "'expireTimeout' must be more than 0.");
986+
this.messageStore.expireMessageGroups(this.expireTimeout);
987+
}
988+
920989
protected static class SequenceAwareMessageGroup extends SimpleMessageGroup {
921990

922991
private final SimpleMessageGroup sourceGroup;

spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.config;
1818

19+
import java.time.Duration;
1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.function.Function;
@@ -62,13 +63,6 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
6263

6364
private String outputChannelName;
6465

65-
@SuppressWarnings("deprecation")
66-
private org.springframework.integration.support.management.AbstractMessageHandlerMetrics metrics;
67-
68-
private Boolean statsEnabled;
69-
70-
private Boolean countsEnabled;
71-
7266
private LockRegistry lockRegistry;
7367

7468
private MessageGroupStore messageStore;
@@ -97,6 +91,10 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
9791

9892
private Boolean releaseLockBeforeSend;
9993

94+
private Long expireTimeout;
95+
96+
private Long expireDuration;
97+
10098
private Function<MessageGroup, Map<String, Object>> headersFunction;
10199

102100
public void setProcessorBean(Object processorBean) {
@@ -120,25 +118,6 @@ public void setOutputChannelName(String outputChannelName) {
120118
this.outputChannelName = outputChannelName;
121119
}
122120

123-
/**
124-
* Deprecated.
125-
* @param metrics the metrics.
126-
* @deprecated in favor of Micrometer metrics.
127-
*/
128-
@Deprecated
129-
@SuppressWarnings("deprecation")
130-
public void setMetrics(org.springframework.integration.support.management.AbstractMessageHandlerMetrics metrics) {
131-
this.metrics = metrics;
132-
}
133-
134-
public void setStatsEnabled(Boolean statsEnabled) {
135-
this.statsEnabled = statsEnabled;
136-
}
137-
138-
public void setCountsEnabled(Boolean countsEnabled) {
139-
this.countsEnabled = countsEnabled;
140-
}
141-
142121
public void setLockRegistry(LockRegistry lockRegistry) {
143122
this.lockRegistry = lockRegistry;
144123
}
@@ -199,7 +178,14 @@ public void setHeadersFunction(Function<MessageGroup, Map<String, Object>> heade
199178
this.headersFunction = headersFunction;
200179
}
201180

202-
@SuppressWarnings("deprecation")
181+
public void setExpireTimeout(Long expireTimeout) {
182+
this.expireTimeout = expireTimeout;
183+
}
184+
185+
public void setExpireDurationMillis(Long expireDuration) {
186+
this.expireDuration = expireDuration;
187+
}
188+
203189
@Override
204190
protected AggregatingMessageHandler createHandler() {
205191
MessageGroupProcessor outputProcessor;
@@ -229,9 +215,6 @@ protected AggregatingMessageHandler createHandler() {
229215
.acceptIfNotNull(this.expireGroupsUponCompletion, aggregator::setExpireGroupsUponCompletion)
230216
.acceptIfNotNull(this.sendTimeout, aggregator::setSendTimeout)
231217
.acceptIfNotNull(this.outputChannelName, aggregator::setOutputChannelName)
232-
.acceptIfNotNull(this.metrics, aggregator::configureMetrics)
233-
.acceptIfNotNull(this.statsEnabled, aggregator::setStatsEnabled)
234-
.acceptIfNotNull(this.countsEnabled, aggregator::setCountsEnabled)
235218
.acceptIfNotNull(this.lockRegistry, aggregator::setLockRegistry)
236219
.acceptIfNotNull(this.messageStore, aggregator::setMessageStore)
237220
.acceptIfNotNull(this.correlationStrategy, aggregator::setCorrelationStrategy)
@@ -245,7 +228,10 @@ protected AggregatingMessageHandler createHandler() {
245228
.acceptIfNotNull(this.minimumTimeoutForEmptyGroups, aggregator::setMinimumTimeoutForEmptyGroups)
246229
.acceptIfNotNull(this.expireGroupsUponTimeout, aggregator::setExpireGroupsUponTimeout)
247230
.acceptIfNotNull(this.popSequence, aggregator::setPopSequence)
248-
.acceptIfNotNull(this.releaseLockBeforeSend, aggregator::setReleaseLockBeforeSend);
231+
.acceptIfNotNull(this.releaseLockBeforeSend, aggregator::setReleaseLockBeforeSend)
232+
.acceptIfNotNull(this.expireDuration,
233+
(duration) -> aggregator.setExpireDuration(Duration.ofMillis(duration)))
234+
.acceptIfNotNull(this.expireTimeout, aggregator::setExpireTimeout);
249235

250236
return aggregator;
251237
}

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,6 +101,9 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
101101

102102
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);
103103
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, RELEASE_LOCK);
104+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expire-timeout");
105+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expire-duration",
106+
"expireDurationMillis");
104107
}
105108

106109
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.time.Duration;
1920
import java.util.Arrays;
2021
import java.util.LinkedList;
2122
import java.util.List;
@@ -329,4 +330,28 @@ public S popSequence(boolean popSequence) {
329330
return _this();
330331
}
331332

333+
/**
334+
* Configure a timeout for old groups in the store to purge.
335+
* @param expireTimeout the timeout in milliseconds to use.
336+
* @return the endpoint spec.
337+
* @since 5.4
338+
* @see AbstractCorrelatingMessageHandler#setExpireTimeout(long)
339+
*/
340+
public S setExpireTimeout(long expireTimeout) {
341+
this.handler.setExpireTimeout(expireTimeout);
342+
return _this();
343+
}
344+
345+
/**
346+
* Configure a {@link Duration} how often to run a scheduled purge task.
347+
* @param expireDuration the duration for scheduled purge task.
348+
* @return the endpoint spec.
349+
* @since 5.4
350+
* @see AbstractCorrelatingMessageHandler#setExpireDuration(Duration)
351+
*/
352+
public S setExpireDuration(Duration expireDuration) {
353+
this.handler.setExpireDuration(expireDuration);
354+
return _this();
355+
}
356+
332357
}

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4037,6 +4037,22 @@
40374037
<xsd:union memberTypes="xsd:boolean xsd:string"/>
40384038
</xsd:simpleType>
40394039
</xsd:attribute>
4040+
<xsd:attribute name="expire-timeout" type="xsd:string">
4041+
<xsd:annotation>
4042+
<xsd:documentation>
4043+
The timeout for old groups in the store to purge on startup.
4044+
If 'expire-duration' is also provided, the purge task is scheduled
4045+
periodically to purge old groups with this expiration timeout.
4046+
</xsd:documentation>
4047+
</xsd:annotation>
4048+
</xsd:attribute>
4049+
<xsd:attribute name="expire-duration" type="xsd:string">
4050+
<xsd:annotation>
4051+
<xsd:documentation>
4052+
The 'Duration' (in milliseconds) how often to run purge old groups scheduled task.
4053+
</xsd:documentation>
4054+
</xsd:annotation>
4055+
</xsd:attribute>
40404056
</xsd:extension>
40414057
</xsd:complexContent>
40424058
</xsd:complexType>

0 commit comments

Comments
 (0)