Skip to content

Commit 08dc0a3

Browse files
committed
GH-3334: Add "embedded reaper" into CorrelationMH
Fixes #3334 * Add `expireTimeout` property into `AbstractCorrelatingMessageHandler` to call newly introduced `purgeOrphanedGroups()` API for removing old groups from the store * Add `expireDuration` to perform `purgeOrphanedGroups()` task periodically
1 parent 8d3d871 commit 08dc0a3

File tree

2 files changed

+110
-9
lines changed

2 files changed

+110
-9
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.aggregator;
1818

19+
import java.time.Duration;
1920
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.Comparator;
@@ -53,6 +54,7 @@
5354
import org.springframework.integration.support.locks.DefaultLockRegistry;
5455
import org.springframework.integration.support.locks.LockRegistry;
5556
import org.springframework.integration.util.UUIDConverter;
57+
import org.springframework.lang.Nullable;
5658
import org.springframework.messaging.Message;
5759
import org.springframework.messaging.MessageChannel;
5860
import org.springframework.messaging.MessageDeliveryException;
@@ -82,6 +84,11 @@
8284
* Use proper {@link CorrelationStrategy} for cases when same
8385
* {@link org.springframework.integration.store.MessageStore} is used
8486
* for multiple handlers to ensure uniqueness of message groups across handlers.
87+
* <p>
88+
* When the {@link #expireTimeout} is more than 0, the groups which older then this timeout
89+
* is purged from the store on start up (or when {@link #purgeOrphanedGroups()} is called).
90+
* If {@link #expireDuration} is provide, the task is scheduled to perform
91+
* {@link #purgeOrphanedGroups()} periodically.
8592
*
8693
* @author Iwein Fuld
8794
* @author Dave Syer
@@ -132,6 +139,11 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
132139

133140
private List<Advice> forceReleaseAdviceChain;
134141

142+
private long expireTimeout;
143+
144+
@Nullable
145+
private Duration expireDuration;
146+
135147
private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
136148

137149
private EvaluationContext evaluationContext;
@@ -310,6 +322,32 @@ public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) {
310322
this.releaseLockBeforeSend = releaseLockBeforeSend;
311323
}
312324

325+
/**
326+
* Configure a timeout in milliseconds for purging old orphaned groups from the store.
327+
* Used on startup and if {@link #expireDuration} is provided the task for {@link #purgeOrphanedGroups()}
328+
* is scheduled with that period.
329+
* The {@link #forceReleaseProcessor} is used to process those expired groups according
330+
* the "force complete" options.
331+
* @param expireTimeout the number of milliseconds to determine old orphaned groups in the store to purge.
332+
* @since 5.4
333+
* @see #purgeOrphanedGroups()
334+
*/
335+
public void setExpireTimeout(long expireTimeout) {
336+
Assert.isTrue(expireTimeout > 0, "'expireTimeout' must be more than 0.");
337+
this.expireTimeout = expireTimeout;
338+
}
339+
340+
/**
341+
* Configure a {@link Duration} how often to clean up old orphaned groups from the store.
342+
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
343+
* @since 5.4
344+
* @see #purgeOrphanedGroups()
345+
* @see #setExpireTimeout(long)
346+
*/
347+
public void setExpireDuration(@Nullable Duration expireDuration) {
348+
this.expireDuration = expireDuration;
349+
}
350+
313351
@Override
314352
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
315353
this.applicationEventPublisher = applicationEventPublisher;
@@ -896,6 +934,13 @@ public void start() {
896934
if (this.releaseStrategy instanceof Lifecycle) {
897935
((Lifecycle) this.releaseStrategy).start();
898936
}
937+
if (this.expireTimeout > 0) {
938+
purgeOrphanedGroups();
939+
if (this.expireDuration != null) {
940+
getTaskScheduler()
941+
.scheduleWithFixedDelay(this::purgeOrphanedGroups, this.expireDuration);
942+
}
943+
}
899944
}
900945
}
901946

@@ -917,6 +962,17 @@ public boolean isRunning() {
917962
return this.running;
918963
}
919964

965+
/**
966+
* Perform a {@link MessageGroupStore#expireMessageGroups(long)} with the provided {@link #expireTimeout}.
967+
* Can be called externally at any time.
968+
* Internally it is called from the scheduled task with the configured {@link #expireDuration}.
969+
* @since 5.4
970+
*/
971+
public void purgeOrphanedGroups() {
972+
Assert.isTrue(expireTimeout > 0, "'expireTimeout' must be more than 0.");
973+
this.messageStore.expireMessageGroups(this.expireTimeout);
974+
}
975+
920976
protected static class SequenceAwareMessageGroup extends SimpleMessageGroup {
921977

922978
private final SimpleMessageGroup sourceGroup;

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,11 +17,14 @@
1717
package org.springframework.integration.aggregator;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.atLeast;
21+
import static org.mockito.Mockito.mock;
2022
import static org.mockito.Mockito.never;
2123
import static org.mockito.Mockito.spy;
2224
import static org.mockito.Mockito.verify;
2325

2426
import java.lang.reflect.Method;
27+
import java.time.Duration;
2528
import java.util.ArrayList;
2629
import java.util.Collection;
2730
import java.util.List;
@@ -31,10 +34,11 @@
3134
import java.util.concurrent.Executors;
3235
import java.util.concurrent.TimeUnit;
3336

34-
import org.junit.Ignore;
35-
import org.junit.Test;
37+
import org.junit.jupiter.api.Disabled;
38+
import org.junit.jupiter.api.Test;
3639

3740
import org.springframework.beans.DirectFieldAccessor;
41+
import org.springframework.beans.factory.BeanFactory;
3842
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3943
import org.springframework.integration.channel.QueueChannel;
4044
import org.springframework.integration.store.MessageGroup;
@@ -57,7 +61,7 @@
5761
*/
5862
public class AbstractCorrelatingMessageHandlerTests {
5963

60-
@Test // INT-2751
64+
@Test
6165
public void testReaperDoesntReapAProcessingGroup() throws Exception {
6266
final MessageGroupStore groupStore = new SimpleMessageStore();
6367
final CountDownLatch waitForSendLatch = new CountDownLatch(1);
@@ -147,7 +151,7 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
147151
exec.shutdownNow();
148152
}
149153

150-
@Test // INT-2833
154+
@Test
151155
public void testReaperReapsAnEmptyGroup() {
152156
final MessageGroupStore groupStore = new SimpleMessageStore();
153157
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
@@ -176,7 +180,7 @@ public void testReaperReapsAnEmptyGroup() {
176180
.isEqualTo(0);
177181
}
178182

179-
@Test // INT-2833
183+
@Test
180184
public void testReaperReapsAnEmptyGroupAfterConfiguredDelay() throws Exception {
181185
final MessageGroupStore groupStore = new SimpleMessageStore();
182186
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
@@ -251,7 +255,7 @@ public void testReapWithChangeInSameMillisecond() throws Exception {
251255
assertThat(payload.size()).isEqualTo(1);
252256
}
253257

254-
@Test /* INT-3216 */
258+
@Test
255259
public void testDontReapIfAlreadyComplete() throws Exception {
256260
MessageGroupProcessor mgp = new DefaultAggregatingMessageGroupProcessor();
257261
AggregatingMessageHandler handler = new AggregatingMessageHandler(mgp);
@@ -384,7 +388,7 @@ public void removeMessageGroup(Object groupId) {
384388
}
385389

386390
@Test
387-
@Ignore("Time sensitive: the empty group might be removed before main thread reaches assertion for size")
391+
@Disabled("Time sensitive: the empty group might be removed before main thread reaches assertion for size")
388392
public void testScheduleRemoveAnEmptyGroupAfterConfiguredDelay() throws Exception {
389393
final MessageGroupStore groupStore = new SimpleMessageStore();
390394
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
@@ -430,7 +434,7 @@ public void testScheduleRemoveAnEmptyGroupAfterConfiguredDelay() throws Exceptio
430434
}
431435

432436
@Test
433-
@Ignore("Until 5.2 with new 'owner' feature on groups")
437+
@Disabled("Until 5.2 with new 'owner' feature on groups")
434438
public void testDontReapMessageOfOtherHandler() {
435439
MessageGroupStore groupStore = new SimpleMessageStore();
436440

@@ -484,4 +488,45 @@ public void testNoPopSequenceDetails() {
484488
assertThat(receive.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS)).isTrue();
485489
}
486490

491+
@Test
492+
public void testPurgeOrphanedGroupsOnStartup() throws InterruptedException {
493+
MessageGroupStore groupStore = new SimpleMessageStore();
494+
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
495+
handler.setReleaseStrategy(group -> false);
496+
QueueChannel discardChannel = new QueueChannel();
497+
handler.setDiscardChannel(discardChannel);
498+
handler.setExpireTimeout(1);
499+
handler.setBeanFactory(mock(BeanFactory.class));
500+
handler.afterPropertiesSet();
501+
handler.handleMessageInternal(MessageBuilder.withPayload("test").setCorrelationId("test").build());
502+
Thread.sleep(100);
503+
handler.start();
504+
Message<?> receive = discardChannel.receive(10000);
505+
assertThat(receive).isNotNull();
506+
assertThat(groupStore.getMessageGroupCount()).isEqualTo(0);
507+
}
508+
509+
@Test
510+
public void testPurgeOrphanedGroupsScheduled() {
511+
MessageGroupStore groupStore = spy(new SimpleMessageStore());
512+
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
513+
handler.setReleaseStrategy(group -> false);
514+
QueueChannel discardChannel = new QueueChannel();
515+
handler.setDiscardChannel(discardChannel);
516+
handler.setExpireTimeout(100);
517+
handler.setExpireDuration(Duration.ofMillis(10));
518+
handler.setBeanFactory(mock(BeanFactory.class));
519+
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
520+
taskScheduler.afterPropertiesSet();
521+
handler.setTaskScheduler(taskScheduler);
522+
handler.afterPropertiesSet();
523+
handler.handleMessageInternal(MessageBuilder.withPayload("test").setCorrelationId("test").build());
524+
handler.start();
525+
Message<?> receive = discardChannel.receive(10000);
526+
assertThat(receive).isNotNull();
527+
assertThat(groupStore.getMessageGroupCount()).isEqualTo(0);
528+
verify(groupStore, atLeast(2)).expireMessageGroups(100);
529+
taskScheduler.destroy();
530+
}
531+
487532
}

0 commit comments

Comments
 (0)