Skip to content

Commit 833672d

Browse files
authored
Merge pull request #46 from rabbitmq/cluster-recovery
Improve recovery
2 parents 4074016 + 31aed8d commit 833672d

22 files changed

+852
-238
lines changed

ci/start-cluster.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ wait_for_message rabbitmq0 "completed with"
1717

1818
docker exec rabbitmq0 rabbitmqctl await_online_nodes 3
1919

20-
docker exec rabbitmq0 rabbitmqctl enable_feature_flag khepri_db
21-
docker exec rabbitmq1 rabbitmqctl enable_feature_flag khepri_db
22-
docker exec rabbitmq2 rabbitmqctl enable_feature_flag khepri_db
20+
docker exec rabbitmq0 rabbitmqctl enable_feature_flag --experimental khepri_db
21+
docker exec rabbitmq1 rabbitmqctl enable_feature_flag --experimental khepri_db
22+
docker exec rabbitmq2 rabbitmqctl enable_feature_flag --experimental khepri_db
2323

2424
docker exec rabbitmq0 rabbitmqctl cluster_status

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@
346346
<argLine>${test-arguments}</argLine>
347347
<systemPropertyVariables>
348348
<net.bytebuddy.experimental>true</net.bytebuddy.experimental>
349-
<rabbitmqctl.bin>DOCKER:rabbitmq</rabbitmqctl.bin>
350349
</systemPropertyVariables>
351350
</configuration>
352351
</plugin>

src/main/java/com/rabbitmq/client/amqp/BackOffDelayPolicy.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,85 @@ public interface BackOffDelayPolicy {
2525

2626
Duration delay(int recoveryAttempt);
2727

28+
static BackOffDelayPolicy fixedWithInitialDelay(Duration initialDelay, Duration delay) {
29+
return new FixedWithInitialDelayBackOffPolicy(initialDelay, delay);
30+
}
31+
32+
static BackOffDelayPolicy fixedWithInitialDelay(
33+
Duration initialDelay, Duration delay, Duration timeout) {
34+
return new FixedWithInitialDelayAndTimeoutBackOffPolicy(initialDelay, delay, timeout);
35+
}
36+
2837
static BackOffDelayPolicy fixed(Duration delay) {
29-
return new FixedBackOffDelayPolicy(delay);
38+
return new FixedWithInitialDelayBackOffPolicy(delay, delay);
3039
}
3140

32-
class FixedBackOffDelayPolicy implements BackOffDelayPolicy {
41+
final class FixedWithInitialDelayBackOffPolicy implements BackOffDelayPolicy {
3342

43+
private final Duration initialDelay;
3444
private final Duration delay;
3545

36-
private FixedBackOffDelayPolicy(Duration delay) {
46+
private FixedWithInitialDelayBackOffPolicy(Duration initialDelay, Duration delay) {
47+
this.initialDelay = initialDelay;
3748
this.delay = delay;
3849
}
3950

4051
@Override
4152
public Duration delay(int recoveryAttempt) {
42-
return this.delay;
53+
return recoveryAttempt == 0 ? initialDelay : delay;
54+
}
55+
56+
@Override
57+
public String toString() {
58+
return "FixedWithInitialDelayBackOffPolicy{"
59+
+ "initialDelay="
60+
+ initialDelay
61+
+ ", delay="
62+
+ delay
63+
+ '}';
64+
}
65+
}
66+
67+
final class FixedWithInitialDelayAndTimeoutBackOffPolicy implements BackOffDelayPolicy {
68+
69+
private final int attemptLimitBeforeTimeout;
70+
private final BackOffDelayPolicy delegate;
71+
72+
private FixedWithInitialDelayAndTimeoutBackOffPolicy(
73+
Duration initialDelay, Duration delay, Duration timeout) {
74+
this(fixedWithInitialDelay(initialDelay, delay), timeout);
75+
}
76+
77+
private FixedWithInitialDelayAndTimeoutBackOffPolicy(
78+
BackOffDelayPolicy policy, Duration timeout) {
79+
if (timeout.toMillis() < policy.delay(0).toMillis()) {
80+
throw new IllegalArgumentException("Timeout must be longer than initial delay");
81+
}
82+
this.delegate = policy;
83+
// best effort, assume FixedWithInitialDelay-ish policy
84+
Duration initialDelay = policy.delay(0);
85+
Duration delay = policy.delay(1);
86+
long timeoutWithInitialDelay = timeout.toMillis() - initialDelay.toMillis();
87+
this.attemptLimitBeforeTimeout = (int) (timeoutWithInitialDelay / delay.toMillis()) + 1;
88+
}
89+
90+
@Override
91+
public Duration delay(int recoveryAttempt) {
92+
if (recoveryAttempt >= attemptLimitBeforeTimeout) {
93+
return TIMEOUT;
94+
} else {
95+
return delegate.delay(recoveryAttempt);
96+
}
97+
}
98+
99+
@Override
100+
public String toString() {
101+
return "FixedWithInitialDelayAndTimeoutBackOffPolicy{"
102+
+ "attemptLimitBeforeTimeout="
103+
+ attemptLimitBeforeTimeout
104+
+ ", delegate="
105+
+ delegate
106+
+ '}';
43107
}
44108
}
45109
}

0 commit comments

Comments
 (0)