Skip to content

Commit 31a23bf

Browse files
committed
Retry stored offset lookup in consumer update callback
Fixes #691
1 parent def5523 commit 31a23bf

File tree

9 files changed

+352
-97
lines changed

9 files changed

+352
-97
lines changed

src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ private AsyncRetry(
5353
this.completableFuture.completeExceptionally(new CancellationException());
5454
return;
5555
}
56+
if (this.completableFuture.isCancelled()) {
57+
LOGGER.debug("Task '{}' cancelled", description);
58+
return;
59+
}
5660
try {
5761
LOGGER.debug(
5862
"Running task '{}' (virtual threads: {})",
@@ -64,9 +68,10 @@ private AsyncRetry(
6468
} catch (Exception e) {
6569
int attemptCount = attempts.getAndIncrement();
6670
LOGGER.debug(
67-
"Attempt {} for task '{}' failed, checking retry policy",
71+
"Attempt {} for task '{}' failed with '{}', checking retry policy",
6872
attemptCount,
69-
description);
73+
description,
74+
e.getMessage());
7075
if (retry.test(e)) {
7176
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
7277
LOGGER.debug(

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS;
1718
import static com.rabbitmq.stream.impl.Utils.*;
1819
import static java.lang.String.format;
1920
import static java.util.stream.Collectors.toList;
@@ -412,7 +413,7 @@ private static class SubscriptionTracker {
412413
private volatile boolean hasReceivedSomething = false;
413414
private volatile byte subscriptionIdInClient;
414415
private volatile ClientSubscriptionsManager manager;
415-
private volatile AtomicReference<SubscriptionState> state =
416+
private final AtomicReference<SubscriptionState> state =
416417
new AtomicReference<>(SubscriptionState.OPENING);
417418
private final ConsumerFlowStrategy flowStrategy;
418419
private final Lock subscriptionTrackerLock = new ReentrantLock();
@@ -507,7 +508,8 @@ SubscriptionState state() {
507508

508509
String label() {
509510
return String.format(
510-
"[id=%d, stream=%s, consumer=%d]", this.id, this.stream, this.consumer.id());
511+
"[id=%d, stream=%s, name=%s, consumer=%d]",
512+
this.id, this.stream, this.offsetTrackingReference, this.consumer.id());
511513
}
512514
}
513515

@@ -962,8 +964,27 @@ private void recoverSubscription(List<BrokerWrapper> candidates, SubscriptionTra
962964
recoveryBackOffDelayPolicy(),
963965
"Candidate lookup to consume from '%s' (subscription recovery)",
964966
tracker.stream);
967+
} catch (StreamException e) {
968+
LOGGER.warn(
969+
"Stream error while re-assigning subscription from stream {} (name {})",
970+
tracker.stream,
971+
tracker.offsetTrackingReference,
972+
e);
973+
if (e.getCode() == RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS) {
974+
LOGGER.debug("Subscription ID already existing, retrying");
975+
} else {
976+
LOGGER.debug(
977+
"Not re-assigning consumer '{}' because of '{}'", tracker.label(), e.getMessage());
978+
reassignmentCompleted = true;
979+
}
965980
} catch (Exception e) {
966-
LOGGER.warn("Error while re-assigning subscription from stream {}", tracker.stream, e);
981+
LOGGER.warn(
982+
"Error while re-assigning subscription from stream {} (name {})",
983+
tracker.stream,
984+
tracker.offsetTrackingReference,
985+
e);
986+
LOGGER.debug(
987+
"Not re-assigning consumer '{}' because of '{}'", tracker.label(), e.getMessage());
967988
reassignmentCompleted = true;
968989
}
969990
}
@@ -1002,11 +1023,13 @@ void add(
10021023
List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;
10031024

10041025
LOGGER.debug(
1005-
"Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}",
1026+
"Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}, "
1027+
+ "subscription ID is {}",
10061028
subscriptionTracker.stream,
10071029
offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification,
10081030
subscriptionTracker.offsetTrackingReference,
1009-
subscriptionTracker.subscriptionProperties);
1031+
subscriptionTracker.subscriptionProperties,
1032+
subscriptionId);
10101033
try {
10111034
// updating data structures before subscribing
10121035
// (to make sure they are up-to-date in case message would arrive super fast)
@@ -1063,12 +1086,11 @@ void add(
10631086
subscriptionContext.offsetSpecification());
10641087

10651088
checkNotClosed();
1066-
byte subId = subscriptionId;
10671089
Client.Response subscribeResponse =
10681090
Utils.callAndMaybeRetry(
10691091
() ->
10701092
client.subscribe(
1071-
subId,
1093+
subscriptionId,
10721094
subscriptionTracker.stream,
10731095
subscriptionContext.offsetSpecification(),
10741096
subscriptionTracker.flowStrategy.initialCredits(),
@@ -1084,6 +1106,19 @@ void add(
10841106
+ " failed with code "
10851107
+ formatConstant(subscribeResponse.getResponseCode());
10861108
LOGGER.debug(message);
1109+
if (subscribeResponse.getResponseCode()
1110+
== RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS) {
1111+
if (LOGGER.isDebugEnabled()) {
1112+
SubscriptionTracker initialTracker = previousSubscriptions.get(subscriptionId);
1113+
LOGGER.debug("Subscription ID already exists");
1114+
LOGGER.debug(
1115+
"Initial tracker with sub ID {}: consumer {}, stream {}, name {}",
1116+
subscriptionId,
1117+
initialTracker.consumer.id(),
1118+
initialTracker.stream,
1119+
initialTracker.offsetTrackingReference);
1120+
}
1121+
}
10871122
throw convertCodeToException(
10881123
subscribeResponse.getResponseCode(), subscriptionTracker.stream, () -> message);
10891124
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import java.util.Collection;
18+
import java.util.List;
19+
import java.util.concurrent.*;
20+
21+
final class DelegatingExecutorService implements ExecutorService {
22+
23+
private final ExecutorService delegate;
24+
private final int id;
25+
26+
DelegatingExecutorService(int id, ExecutorService delegate) {
27+
this.id = id;
28+
this.delegate = delegate;
29+
}
30+
31+
@Override
32+
public void shutdown() {
33+
this.delegate.shutdown();
34+
}
35+
36+
@Override
37+
public List<Runnable> shutdownNow() {
38+
return this.delegate.shutdownNow();
39+
}
40+
41+
@Override
42+
public boolean isShutdown() {
43+
return this.delegate.isShutdown();
44+
}
45+
46+
@Override
47+
public boolean isTerminated() {
48+
return this.delegate.isTerminated();
49+
}
50+
51+
@Override
52+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
53+
return this.delegate.awaitTermination(timeout, unit);
54+
}
55+
56+
@Override
57+
public <T> Future<T> submit(Callable<T> task) {
58+
return this.delegate.submit(task);
59+
}
60+
61+
@Override
62+
public <T> Future<T> submit(Runnable task, T result) {
63+
return this.delegate.submit(task, result);
64+
}
65+
66+
@Override
67+
public Future<?> submit(Runnable task) {
68+
return this.delegate.submit(task);
69+
}
70+
71+
@Override
72+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
73+
throws InterruptedException {
74+
return this.delegate.invokeAll(tasks);
75+
}
76+
77+
@Override
78+
public <T> List<Future<T>> invokeAll(
79+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
80+
throws InterruptedException {
81+
return this.delegate.invokeAll(tasks, timeout, unit);
82+
}
83+
84+
@Override
85+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
86+
throws InterruptedException, ExecutionException {
87+
return this.delegate.invokeAny(tasks);
88+
}
89+
90+
@Override
91+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
92+
throws InterruptedException, ExecutionException, TimeoutException {
93+
return this.delegate.invokeAny(tasks, timeout, unit);
94+
}
95+
96+
@Override
97+
public void execute(Runnable command) {
98+
this.delegate.execute(command);
99+
}
100+
101+
@Override
102+
public String toString() {
103+
return "DelegatingExecutorService{" + "id=" + id + '}';
104+
}
105+
}

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,13 @@ public void handle(Client client, int frameSize, ChannelHandlerContext ctx, Byte
251251
}
252252

253253
abstract int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message);
254+
255+
protected void logMissingOutstandingRequest(int correlationId) {
256+
LOGGER.warn(
257+
"Could not find outstanding request with correlation ID {} ({})",
258+
correlationId,
259+
this.getClass().getSimpleName());
260+
}
254261
}
255262

256263
private static class ConfirmFrameHandler extends BaseFrameHandler {
@@ -670,7 +677,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
670677
OutstandingRequest<QueryPublisherSequenceResponse> outstandingRequest =
671678
remove(client.outstandingRequests, correlationId, QueryPublisherSequenceResponse.class);
672679
if (outstandingRequest == null) {
673-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
680+
logMissingOutstandingRequest(correlationId);
674681
} else {
675682
QueryPublisherSequenceResponse response =
676683
new QueryPublisherSequenceResponse(responseCode, sequence);
@@ -695,7 +702,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
695702
OutstandingRequest<QueryOffsetResponse> outstandingRequest =
696703
remove(client.outstandingRequests, correlationId, QueryOffsetResponse.class);
697704
if (outstandingRequest == null) {
698-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
705+
logMissingOutstandingRequest(correlationId);
699706
} else {
700707
QueryOffsetResponse response = new QueryOffsetResponse(responseCode, offset);
701708
outstandingRequest.response().set(response);
@@ -744,7 +751,13 @@ private static class PeerPropertiesFrameHandler extends BaseFrameHandler {
744751

745752
@Override
746753
int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
754+
LOGGER.debug(
755+
"Handling peer properties response for connection {}", client.clientConnectionName());
747756
int correlationId = message.readInt();
757+
LOGGER.debug(
758+
"Handling peer properties response for connection {}, correlation ID is {}",
759+
client.clientConnectionName(),
760+
correlationId);
748761
int read = 4;
749762

750763
short responseCode = message.readShort();
@@ -773,7 +786,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
773786
OutstandingRequest<Map<String, String>> outstandingRequest =
774787
remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<>() {});
775788
if (outstandingRequest == null) {
776-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
789+
logMissingOutstandingRequest(correlationId);
777790
} else {
778791
outstandingRequest.response().set(Collections.unmodifiableMap(serverProperties));
779792
outstandingRequest.countDown();
@@ -811,7 +824,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
811824
OutstandingRequest<OpenResponse> outstandingRequest =
812825
remove(client.outstandingRequests, correlationId, OpenResponse.class);
813826
if (outstandingRequest == null) {
814-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
827+
logMissingOutstandingRequest(correlationId);
815828
} else {
816829
outstandingRequest.response().set(new OpenResponse(responseCode, connectionProperties));
817830
outstandingRequest.countDown();
@@ -900,7 +913,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
900913
OutstandingRequest<SaslAuthenticateResponse> outstandingRequest =
901914
remove(client.outstandingRequests, correlationId, SaslAuthenticateResponse.class);
902915
if (outstandingRequest == null) {
903-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
916+
logMissingOutstandingRequest(correlationId);
904917
} else {
905918
outstandingRequest.response().set(response);
906919
outstandingRequest.countDown();
@@ -943,7 +956,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
943956
correlationId,
944957
new ParameterizedTypeReference<List<String>>() {});
945958
if (outstandingRequest == null) {
946-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
959+
logMissingOutstandingRequest(correlationId);
947960
} else {
948961
outstandingRequest.response().set(mechanisms);
949962
outstandingRequest.countDown();
@@ -1005,7 +1018,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
10051018
correlationId,
10061019
new ParameterizedTypeReference<Map<String, StreamMetadata>>() {});
10071020
if (outstandingRequest == null) {
1008-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1021+
logMissingOutstandingRequest(correlationId);
10091022
} else {
10101023
outstandingRequest.response().set(results);
10111024
outstandingRequest.countDown();
@@ -1026,7 +1039,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
10261039
OutstandingRequest<Response> outstandingRequest =
10271040
remove(client.outstandingRequests, correlationId, Response.class);
10281041
if (outstandingRequest == null) {
1029-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1042+
logMissingOutstandingRequest(correlationId);
10301043
} else {
10311044
Response response = new Response(responseCode);
10321045
outstandingRequest.response().set(response);
@@ -1063,12 +1076,9 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
10631076
}
10641077

10651078
OutstandingRequest<List<String>> outstandingRequest =
1066-
remove(
1067-
client.outstandingRequests,
1068-
correlationId,
1069-
new ParameterizedTypeReference<List<String>>() {});
1079+
remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<>() {});
10701080
if (outstandingRequest == null) {
1071-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1081+
logMissingOutstandingRequest(correlationId);
10721082
} else {
10731083
outstandingRequest.response().set(streams);
10741084
outstandingRequest.countDown();
@@ -1110,7 +1120,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11101120
correlationId,
11111121
new ParameterizedTypeReference<List<String>>() {});
11121122
if (outstandingRequest == null) {
1113-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1123+
logMissingOutstandingRequest(correlationId);
11141124
} else {
11151125
outstandingRequest.response().set(streams);
11161126
outstandingRequest.countDown();
@@ -1155,7 +1165,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11551165
correlationId,
11561166
new ParameterizedTypeReference<List<FrameHandlerInfo>>() {});
11571167
if (outstandingRequest == null) {
1158-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1168+
logMissingOutstandingRequest(correlationId);
11591169
} else {
11601170
outstandingRequest.response().set(commandVersions);
11611171
outstandingRequest.countDown();
@@ -1189,7 +1199,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11891199
OutstandingRequest<StreamStatsResponse> outstandingRequest =
11901200
remove(client.outstandingRequests, correlationId, StreamStatsResponse.class);
11911201
if (outstandingRequest == null) {
1192-
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1202+
logMissingOutstandingRequest(correlationId);
11931203
} else {
11941204
outstandingRequest.response().set(new StreamStatsResponse(responseCode, info));
11951205
outstandingRequest.countDown();

0 commit comments

Comments
 (0)