Skip to content

GH-3334: Add "embedded reaper" into CorrelationMH #3342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.aggregator;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
Expand Down Expand Up @@ -82,6 +84,11 @@
* Use proper {@link CorrelationStrategy} for cases when same
* {@link org.springframework.integration.store.MessageStore} is used
* for multiple handlers to ensure uniqueness of message groups across handlers.
* <p>
* When the {@link #expireTimeout} is greater than 0, groups which are older than this timeout
* are purged from the store on start up (or when {@link #purgeOrphanedGroups()} is called).
* If {@link #expireDuration} is provided, the task is scheduled to perform
* {@link #purgeOrphanedGroups()} periodically.
*
* @author Iwein Fuld
* @author Dave Syer
Expand Down Expand Up @@ -132,6 +139,11 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private List<Advice> forceReleaseAdviceChain;

private long expireTimeout;

@Nullable
private Duration expireDuration;

private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();

private EvaluationContext evaluationContext;
Expand Down Expand Up @@ -310,6 +322,45 @@ public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) {
this.releaseLockBeforeSend = releaseLockBeforeSend;
}

/**
* Configure a timeout in milliseconds for purging old orphaned groups from the store.
* Used on startup and when an {@link #expireDuration} is provided, the task for running
* {@link #purgeOrphanedGroups()} is scheduled with that period.
* The {@link #forceReleaseProcessor} is used to process those expired groups according
* the "force complete" options. A group can be orphaned if a persistent message group
* store is used and no new messages arrive for that group after a restart.
* @param expireTimeout the number of milliseconds to determine old orphaned groups in the store to purge.
* @since 5.4
* @see #purgeOrphanedGroups()
*/
public void setExpireTimeout(long expireTimeout) {
Assert.isTrue(expireTimeout > 0, "'expireTimeout' must be more than 0.");
this.expireTimeout = expireTimeout;
}

/**
* Configure a {@link Duration} (in millis) how often to clean up old orphaned groups from the store.
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
* @since 5.4
* @see #purgeOrphanedGroups()
* @see #setExpireDuration(Duration)
* @see #setExpireTimeout(long)
*/
public void setExpireDurationMillis(long expireDuration) {
setExpireDuration(Duration.ofMillis(expireDuration));
}

/**
* Configure a {@link Duration} how often to clean up old orphaned groups from the store.
* @param expireDuration the delay how often to call {@link #purgeOrphanedGroups()}.
* @since 5.4
* @see #purgeOrphanedGroups()
* @see #setExpireTimeout(long)
*/
public void setExpireDuration(@Nullable Duration expireDuration) {
this.expireDuration = expireDuration;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -896,6 +947,13 @@ public void start() {
if (this.releaseStrategy instanceof Lifecycle) {
((Lifecycle) this.releaseStrategy).start();
}
if (this.expireTimeout > 0) {
purgeOrphanedGroups();
if (this.expireDuration != null) {
getTaskScheduler()
.scheduleWithFixedDelay(this::purgeOrphanedGroups, this.expireDuration);
}
}
}
}

Expand All @@ -917,6 +975,17 @@ public boolean isRunning() {
return this.running;
}

/**
* Perform a {@link MessageGroupStore#expireMessageGroups(long)} with the provided {@link #expireTimeout}.
* Can be called externally at any time.
* Internally it is called from the scheduled task with the configured {@link #expireDuration}.
* @since 5.4
*/
public void purgeOrphanedGroups() {
Assert.isTrue(this.expireTimeout > 0, "'expireTimeout' must be more than 0.");
this.messageStore.expireMessageGroups(this.expireTimeout);
}

protected static class SequenceAwareMessageGroup extends SimpleMessageGroup {

private final SimpleMessageGroup sourceGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.config;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -62,13 +63,6 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe

private String outputChannelName;

@SuppressWarnings("deprecation")
private org.springframework.integration.support.management.AbstractMessageHandlerMetrics metrics;

private Boolean statsEnabled;

private Boolean countsEnabled;

private LockRegistry lockRegistry;

private MessageGroupStore messageStore;
Expand Down Expand Up @@ -97,6 +91,10 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe

private Boolean releaseLockBeforeSend;

private Long expireTimeout;

private Long expireDuration;

private Function<MessageGroup, Map<String, Object>> headersFunction;

public void setProcessorBean(Object processorBean) {
Expand All @@ -120,25 +118,6 @@ public void setOutputChannelName(String outputChannelName) {
this.outputChannelName = outputChannelName;
}

/**
* Deprecated.
* @param metrics the metrics.
* @deprecated in favor of Micrometer metrics.
*/
@Deprecated
@SuppressWarnings("deprecation")
public void setMetrics(org.springframework.integration.support.management.AbstractMessageHandlerMetrics metrics) {
this.metrics = metrics;
}

public void setStatsEnabled(Boolean statsEnabled) {
this.statsEnabled = statsEnabled;
}

public void setCountsEnabled(Boolean countsEnabled) {
this.countsEnabled = countsEnabled;
}

public void setLockRegistry(LockRegistry lockRegistry) {
this.lockRegistry = lockRegistry;
}
Expand Down Expand Up @@ -199,7 +178,14 @@ public void setHeadersFunction(Function<MessageGroup, Map<String, Object>> heade
this.headersFunction = headersFunction;
}

@SuppressWarnings("deprecation")
public void setExpireTimeout(Long expireTimeout) {
this.expireTimeout = expireTimeout;
}

public void setExpireDurationMillis(Long expireDuration) {
this.expireDuration = expireDuration;
}

@Override
protected AggregatingMessageHandler createHandler() {
MessageGroupProcessor outputProcessor;
Expand Down Expand Up @@ -229,9 +215,6 @@ protected AggregatingMessageHandler createHandler() {
.acceptIfNotNull(this.expireGroupsUponCompletion, aggregator::setExpireGroupsUponCompletion)
.acceptIfNotNull(this.sendTimeout, aggregator::setSendTimeout)
.acceptIfNotNull(this.outputChannelName, aggregator::setOutputChannelName)
.acceptIfNotNull(this.metrics, aggregator::configureMetrics)
.acceptIfNotNull(this.statsEnabled, aggregator::setStatsEnabled)
.acceptIfNotNull(this.countsEnabled, aggregator::setCountsEnabled)
.acceptIfNotNull(this.lockRegistry, aggregator::setLockRegistry)
.acceptIfNotNull(this.messageStore, aggregator::setMessageStore)
.acceptIfNotNull(this.correlationStrategy, aggregator::setCorrelationStrategy)
Expand All @@ -245,7 +228,10 @@ protected AggregatingMessageHandler createHandler() {
.acceptIfNotNull(this.minimumTimeoutForEmptyGroups, aggregator::setMinimumTimeoutForEmptyGroups)
.acceptIfNotNull(this.expireGroupsUponTimeout, aggregator::setExpireGroupsUponTimeout)
.acceptIfNotNull(this.popSequence, aggregator::setPopSequence)
.acceptIfNotNull(this.releaseLockBeforeSend, aggregator::setReleaseLockBeforeSend);
.acceptIfNotNull(this.releaseLockBeforeSend, aggregator::setReleaseLockBeforeSend)
.acceptIfNotNull(this.expireDuration,
(duration) -> aggregator.setExpireDuration(Duration.ofMillis(duration)))
.acceptIfNotNull(this.expireTimeout, aggregator::setExpireTimeout);

return aggregator;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,6 +101,9 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, RELEASE_LOCK);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expire-timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expire-duration",
"expireDurationMillis");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.dsl;

import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -329,4 +330,28 @@ public S popSequence(boolean popSequence) {
return _this();
}

/**
* Configure a timeout for old groups in the store to purge.
* @param expireTimeout the timeout in milliseconds to use.
* @return the endpoint spec.
* @since 5.4
* @see AbstractCorrelatingMessageHandler#setExpireTimeout(long)
*/
public S setExpireTimeout(long expireTimeout) {
this.handler.setExpireTimeout(expireTimeout);
return _this();
}

/**
* Configure a {@link Duration} how often to run a scheduled purge task.
* @param expireDuration the duration for scheduled purge task.
* @return the endpoint spec.
* @since 5.4
* @see AbstractCorrelatingMessageHandler#setExpireDuration(Duration)
*/
public S setExpireDuration(Duration expireDuration) {
this.handler.setExpireDuration(expireDuration);
return _this();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4037,6 +4037,22 @@
<xsd:union memberTypes="xsd:boolean xsd:string"/>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="expire-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
The timeout for old groups in the store to purge on startup.
If 'expire-duration' is also provided, the purge task is scheduled
periodically to purge old groups with this expiration timeout.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="expire-duration" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
The 'Duration' (in milliseconds) how often to run purge old groups scheduled task.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down
Loading