Skip to content

Commit e16b5b4

Browse files
committed
Retry stream consumer recovery if no local member
A stream can take a few seconds to restart after a node restart, so its local member may not be available, even though the connection has recovered. This commit introduces some retry logic when this happens. Consumers should use their own session in this, as the broker closes the session when this errors comes up.
1 parent 5dace13 commit e16b5b4

File tree

6 files changed

+90
-34
lines changed

6 files changed

+90
-34
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
21+
import static java.time.Duration.ofSeconds;
2122

2223
import com.rabbitmq.client.amqp.AmqpException;
24+
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
2325
import com.rabbitmq.client.amqp.Consumer;
2426
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
27+
import java.util.List;
2528
import java.util.Map;
2629
import java.util.concurrent.*;
2730
import java.util.concurrent.atomic.AtomicBoolean;
@@ -211,8 +214,25 @@ private void startReceivingLoop() {
211214

212215
void recoverAfterConnectionFailure() {
213216
this.nativeReceiver =
214-
createNativeReceiver(
215-
this.sessionHandler.sessionNoCheck(), this.address, this.linkProperties, this.filters);
217+
RetryUtils.callAndMaybeRetry(
218+
() ->
219+
createNativeReceiver(
220+
this.sessionHandler.sessionNoCheck(),
221+
this.address,
222+
this.linkProperties,
223+
this.filters),
224+
e -> {
225+
boolean shouldRetry =
226+
e instanceof AmqpException.AmqpResourceClosedException
227+
&& e.getMessage().contains("stream queue")
228+
&& e.getMessage()
229+
.contains("does not have a running replica on the local node");
230+
LOGGER.debug("Retrying receiver creation on consumer recovery: {}", shouldRetry);
231+
return shouldRetry;
232+
},
233+
List.of(ofSeconds(1), ofSeconds(2), ofSeconds(3), BackOffDelayPolicy.TIMEOUT),
234+
"Create AMQP receiver to address '%s'",
235+
this.address);
216236
this.initStateFromNativeReceiver(this.nativeReceiver);
217237
this.pauseStatus.set(PauseStatus.UNPAUSED);
218238
this.unsettledMessageCount.set(0);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,11 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
7676
return this.sender.send(nativeMessage.durable(true));
7777
} catch (ClientIllegalStateException e) {
7878
// the link is closed
79+
LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage());
7980
this.close(ExceptionUtils.convert(e));
8081
throw ExceptionUtils.convert(e);
8182
} catch (ClientException e) {
83+
LOGGER.debug("Error while publishing: '{}'.", e.getMessage());
8284
throw ExceptionUtils.convert(e);
8385
}
8486
};
@@ -109,6 +111,7 @@ public void publish(Message message, Callback callback) {
109111
() -> {
110112
Status status;
111113
try {
114+
// FIXME set a timeout for publishing settlement
112115
tracker.settlementFuture().get();
113116
status =
114117
tracker.remoteState() == DeliveryState.accepted() ? Status.ACCEPTED : Status.FAILED;

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ SyncAssert completes() {
9090
SyncAssert completes(Duration timeout) {
9191
boolean completed = actual.await(timeout);
9292
if (!completed) {
93-
fail("Sync timed out after %d ms", timeout.toMillis());
93+
fail("Sync '%s' timed out after %d ms", this.actual.toString(), timeout.toMillis());
9494
}
9595
return this;
9696
}

src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@
3737
import java.util.concurrent.atomic.AtomicBoolean;
3838
import java.util.concurrent.atomic.AtomicInteger;
3939
import java.util.concurrent.atomic.AtomicReference;
40-
import java.util.function.Consumer;
40+
import java.util.function.BiConsumer;
4141
import java.util.function.Function;
4242
import java.util.function.UnaryOperator;
4343
import java.util.stream.IntStream;
4444
import java.util.stream.Stream;
45+
import org.jetbrains.annotations.NotNull;
4546
import org.junit.jupiter.api.*;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
@@ -108,26 +109,7 @@ void clusterRestart() {
108109
Management.QueueType.QUORUM,
109110
Management.QueueType.CLASSIC,
110111
Management.QueueType.CLASSIC);
111-
AtomicInteger classicQueueCount = new AtomicInteger(0);
112-
List<QueueConfiguration> queueConfigurations =
113-
queueTypes.stream()
114-
.flatMap(
115-
(Function<Management.QueueType, Stream<QueueConfiguration>>)
116-
type ->
117-
IntStream.range(0, queueCount)
118-
.mapToObj(
119-
ignored -> {
120-
boolean exclusive =
121-
type == Management.QueueType.CLASSIC
122-
&& classicQueueCount.incrementAndGet() > queueCount;
123-
String prefix =
124-
type.name().toLowerCase() + (exclusive ? "-ex-" : "-");
125-
String n = name(prefix);
126-
UnaryOperator<Management.QueueSpecification> c =
127-
s -> s.type(type).exclusive(exclusive);
128-
return new QueueConfiguration(n, type, exclusive, c);
129-
}))
130-
.collect(toList());
112+
List<QueueConfiguration> queueConfigurations = queueConfigurations(queueTypes, queueCount);
131113
List<String> queueNames = queueConfigurations.stream().map(c -> c.name).collect(toList());
132114
List<PublisherState> publisherStates = Collections.emptyList();
133115
List<ConsumerState> consumerStates = Collections.emptyList();
@@ -152,10 +134,12 @@ void clusterRestart() {
152134
.destinationQueue(conf.name)
153135
.bind();
154136
} else {
137+
boolean isolate = conf.type == Management.QueueType.STREAM;
155138
c =
156139
connection(
157140
b ->
158141
b.name(cName)
142+
.isolateResources(isolate)
159143
.affinity()
160144
.queue(conf.name)
161145
.operation(CONSUME)
@@ -210,6 +194,7 @@ void clusterRestart() {
210194
waitAtMostNoException(TIMEOUT, () -> management.queueInfo(n));
211195
});
212196
LOGGER.info("Retrieved info for each queue.");
197+
213198
queueConfigurations.forEach(
214199
c -> {
215200
if (c.type == Management.QueueType.QUORUM || c.type == Management.QueueType.STREAM) {
@@ -258,10 +243,16 @@ void clusterRestart() {
258243
p -> System.out.printf(" queue %s, is on member? %s%n", p.queue, p.isOnMember()));
259244
} catch (Throwable e) {
260245
LOGGER.info("Test failed with {}", e.getMessage(), e);
261-
Consumer<AmqpConnection> log = c -> LOGGER.info("Connection {}: {}", c.name(), c.state());
262-
log.accept(this.connection);
263-
publisherStates.forEach(s -> log.accept(s.connection));
264-
consumerStates.forEach(s -> log.accept(s.connection));
246+
BiConsumer<AmqpConnection, ResourceBase> log =
247+
(c, r) -> {
248+
LOGGER.info("Connection {}: {}", c.name(), c.state());
249+
if (r != null) {
250+
LOGGER.info("Resource: {}", r.state());
251+
}
252+
};
253+
log.accept(this.connection, null);
254+
publisherStates.forEach(s -> log.accept(s.connection, s.publisher));
255+
consumerStates.forEach(s -> log.accept(s.connection, s.consumer));
265256
throw e;
266257
} finally {
267258
publisherStates.forEach(PublisherState::close);
@@ -272,6 +263,30 @@ void clusterRestart() {
272263
}
273264
}
274265

266+
@NotNull
267+
private List<QueueConfiguration> queueConfigurations(
268+
List<Management.QueueType> queueTypes, int queueCount) {
269+
AtomicInteger classicQueueCount = new AtomicInteger(0);
270+
return queueTypes.stream()
271+
.flatMap(
272+
(Function<Management.QueueType, Stream<QueueConfiguration>>)
273+
type ->
274+
IntStream.range(0, queueCount)
275+
.mapToObj(
276+
ignored -> {
277+
boolean exclusive =
278+
type == Management.QueueType.CLASSIC
279+
&& classicQueueCount.incrementAndGet() > queueCount;
280+
String prefix =
281+
type.name().toLowerCase() + (exclusive ? "-ex-" : "-");
282+
String n = name(prefix);
283+
UnaryOperator<Management.QueueSpecification> c =
284+
s -> s.type(type).exclusive(exclusive);
285+
return new QueueConfiguration(n, type, exclusive, c);
286+
}))
287+
.collect(toList());
288+
}
289+
275290
String name(String prefix) {
276291
return prefix + TestUtils.name(this.testInfo);
277292
}
@@ -328,7 +343,9 @@ void start() {
328343
}
329344

330345
Sync waitForNewMessages(int messageCount) {
331-
TestUtils.Sync sync = TestUtils.sync(messageCount, () -> this.postAccepted.set(() -> {}));
346+
TestUtils.Sync sync =
347+
TestUtils.sync(
348+
messageCount, () -> this.postAccepted.set(() -> {}), "Publisher to '%s'", this.queue);
332349
this.postAccepted.set(sync::down);
333350
return sync;
334351
}
@@ -383,7 +400,9 @@ private ConsumerState(String queue, AmqpConnection connection) {
383400
}
384401

385402
TestUtils.Sync waitForNewMessages(int messageCount) {
386-
TestUtils.Sync sync = TestUtils.sync(messageCount, () -> this.postHandle.set(() -> {}));
403+
TestUtils.Sync sync =
404+
TestUtils.sync(
405+
messageCount, () -> this.postHandle.set(() -> {}), "Consumer from '%s'", this.queue);
387406
this.postHandle.set(sync::down);
388407
return sync;
389408
}

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,11 +458,11 @@ static Sync sync() {
458458
}
459459

460460
static Sync sync(int count) {
461-
return new Sync(count, () -> {});
461+
return new Sync(count, () -> {}, null);
462462
}
463463

464-
static Sync sync(int count, Runnable doneCallback) {
465-
return new Sync(count, doneCallback);
464+
static Sync sync(int count, Runnable doneCallback, String format, Object... args) {
465+
return new Sync(count, doneCallback, format, args);
466466
}
467467

468468
private static class CloseableResourceWrapper<T>
@@ -488,11 +488,17 @@ public void close() {
488488

489489
static class Sync {
490490

491+
private final String description;
491492
private final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
492493
private final AtomicReference<Runnable> doneCallback = new AtomicReference<>();
493494

494-
private Sync(int count, Runnable doneCallback) {
495+
private Sync(int count, Runnable doneCallback, String description, Object... args) {
495496
this.latch.set(new CountDownLatch(count));
497+
if (description == null) {
498+
this.description = "N/A";
499+
} else {
500+
this.description = String.format(description, args);
501+
}
496502
this.doneCallback.set(doneCallback);
497503
}
498504

@@ -518,5 +524,10 @@ void reset(int count) {
518524
void reset() {
519525
this.reset(1);
520526
}
527+
528+
@Override
529+
public String toString() {
530+
return this.description;
531+
}
521532
}
522533
}

src/test/resources/logback-test.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
<logger name="com.rabbitmq.client.amqp.impl.EntityRecovery" level="warn" />
2020
<logger name="com.rabbitmq.client.amqp.impl.EventLoop" level="warn" />
2121
<logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="warn" />
22+
<logger name="com.rabbitmq.client.amqp.impl.AmqpPublisher" level="warn" />
23+
<logger name="com.rabbitmq.client.amqp.impl.AmqpConsumer" level="warn" />
24+
<logger name="com.rabbitmq.client.amqp.impl.RetryUtils" level="warn" />
2225

2326
<root level="warn">
2427
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)