Skip to content

Commit 59916b8

Browse files
committed
Fall back to metadata command for RabbitMQ < 3.11
For Environment#streamExists(String). References #370
1 parent a40bd43 commit 59916b8

File tree

4 files changed

+72
-45
lines changed

4 files changed

+72
-45
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -398,10 +398,7 @@ public void initChannel(SocketChannel ch) {
398398
tuneState.getHeartbeat());
399399
this.connectionProperties = open(parameters.virtualHost);
400400
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
401-
if (supportedCommands.stream()
402-
.filter(i -> i.getKey() == COMMAND_STREAM_STATS)
403-
.findAny()
404-
.isPresent()) {
401+
if (supportedCommands.stream().anyMatch(i -> i.getKey() == COMMAND_STREAM_STATS)) {
405402
this.exchangeCommandVersionsCheck = () -> {};
406403
} else {
407404
this.exchangeCommandVersionsCheck =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -89,37 +89,6 @@ class StreamEnvironment implements Environment {
8989
private final List<Locator> locators = new CopyOnWriteArrayList<>();
9090
private final ExecutorServiceFactory executorServiceFactory;
9191

92-
StreamEnvironment(
93-
ScheduledExecutorService scheduledExecutorService,
94-
Client.ClientParameters clientParametersPrototype,
95-
List<URI> uris,
96-
BackOffDelayPolicy recoveryBackOffDelayPolicy,
97-
BackOffDelayPolicy topologyBackOffDelayPolicy,
98-
AddressResolver addressResolver,
99-
int maxProducersByConnection,
100-
int maxTrackingConsumersByConnection,
101-
int maxConsumersByConnection,
102-
DefaultTlsConfiguration tlsConfiguration,
103-
ByteBufAllocator byteBufAllocator,
104-
boolean lazyInit,
105-
Function<ClientConnectionType, String> connectionNamingStrategy) {
106-
this(
107-
scheduledExecutorService,
108-
clientParametersPrototype,
109-
uris,
110-
recoveryBackOffDelayPolicy,
111-
topologyBackOffDelayPolicy,
112-
addressResolver,
113-
maxProducersByConnection,
114-
maxTrackingConsumersByConnection,
115-
maxConsumersByConnection,
116-
tlsConfiguration,
117-
byteBufAllocator,
118-
lazyInit,
119-
connectionNamingStrategy,
120-
cp -> new Client(cp));
121-
}
122-
12392
StreamEnvironment(
12493
ScheduledExecutorService scheduledExecutorService,
12594
Client.ClientParameters clientParametersPrototype,
@@ -513,22 +482,31 @@ public StreamStats queryStreamStats(String stream) {
513482
public boolean streamExists(String stream) {
514483
checkNotClosed();
515484
this.maybeInitializeLocator();
516-
StreamStatsResponse response =
485+
short responseCode =
517486
locatorOperation(
518487
Utils.namedFunction(
519-
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
520-
if (response.isOk()) {
488+
client -> {
489+
try {
490+
return client.streamStats(stream).getResponseCode();
491+
} catch (UnsupportedOperationException e) {
492+
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
493+
return metadata.get(stream).getResponseCode();
494+
}
495+
},
496+
"Stream exists for stream '%s'",
497+
stream));
498+
if (responseCode == Constants.RESPONSE_CODE_OK) {
521499
return true;
522-
} else if (response.getResponseCode() == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
500+
} else if (responseCode == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
523501
return false;
524502
} else {
525503
throw convertCodeToException(
526-
response.getResponseCode(),
504+
responseCode,
527505
stream,
528506
() ->
529507
format(
530508
"Unexpected result when checking if stream '%s' exists: %s.",
531-
stream, formatConstant(response.getResponseCode())));
509+
stream, formatConstant(responseCode)));
532510
}
533511
}
534512

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6868
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
6969
private CompressionCodecFactory compressionCodecFactory;
7070
private boolean lazyInit = false;
71-
private Function<ClientConnectionType, String> connectionNamingStrategy;
71+
private Function<Client.ClientParameters, Client> clientFactory = Client::new;
7272

7373
public StreamEnvironmentBuilder() {}
7474

@@ -302,6 +302,11 @@ public NettyConfiguration netty() {
302302
return this.netty;
303303
}
304304

305+
StreamEnvironmentBuilder clientFactory(Function<Client.ClientParameters, Client> clientFactory) {
306+
this.clientFactory = clientFactory;
307+
return this;
308+
}
309+
305310
@Override
306311
public Environment build() {
307312
if (this.compressionCodecFactory == null) {
@@ -310,7 +315,8 @@ public Environment build() {
310315
this.clientParameters.compressionCodecFactory(this.compressionCodecFactory);
311316
}
312317
this.id = this.id == null ? "rabbitmq-stream" : this.id;
313-
this.connectionNamingStrategy = Utils.defaultConnectionNamingStrategy(this.id + "-");
318+
Function<ClientConnectionType, String> connectionNamingStrategy =
319+
Utils.defaultConnectionNamingStrategy(this.id + "-");
314320
this.clientParameters.eventLoopGroup(this.netty.eventLoopGroup);
315321
this.clientParameters.byteBufAllocator(this.netty.byteBufAllocator);
316322
this.clientParameters.channelCustomizer(this.netty.channelCustomizer);
@@ -328,7 +334,8 @@ public Environment build() {
328334
tls,
329335
netty.byteBufAllocator,
330336
lazyInit,
331-
connectionNamingStrategy);
337+
connectionNamingStrategy,
338+
this.clientFactory);
332339
}
333340

334341
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@
7272
import java.util.concurrent.CopyOnWriteArrayList;
7373
import java.util.concurrent.CountDownLatch;
7474
import java.util.concurrent.atomic.AtomicBoolean;
75+
import java.util.concurrent.atomic.AtomicInteger;
7576
import java.util.concurrent.atomic.AtomicLong;
77+
import java.util.function.Function;
7678
import java.util.function.Supplier;
7779
import java.util.stream.Collectors;
7880
import java.util.stream.IntStream;
@@ -599,9 +601,52 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() {
599601
@ValueSource(booleans = {true, false})
600602
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
601603
void streamExists(boolean lazyInit) {
602-
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
604+
AtomicBoolean metadataCalled = new AtomicBoolean(false);
605+
Function<Client.ClientParameters, Client> clientFactory =
606+
cp ->
607+
new Client(cp) {
608+
@Override
609+
public Map<String, StreamMetadata> metadata(String... streams) {
610+
metadataCalled.set(true);
611+
return super.metadata(streams);
612+
}
613+
};
614+
try (Environment env =
615+
((StreamEnvironmentBuilder) environmentBuilder.lazyInitialization(lazyInit))
616+
.clientFactory(clientFactory)
617+
.build()) {
618+
assertThat(env.streamExists(stream)).isTrue();
619+
assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse();
620+
assertThat(metadataCalled).isFalse();
621+
}
622+
}
623+
624+
@ParameterizedTest
625+
@ValueSource(booleans = {true, false})
626+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
627+
void streamExistsMetadataDataFallback(boolean lazyInit) {
628+
AtomicInteger metadataCallCount = new AtomicInteger(0);
629+
Function<Client.ClientParameters, Client> clientFactory =
630+
cp ->
631+
new Client(cp) {
632+
@Override
633+
StreamStatsResponse streamStats(String stream) {
634+
throw new UnsupportedOperationException();
635+
}
636+
637+
@Override
638+
public Map<String, StreamMetadata> metadata(String... streams) {
639+
metadataCallCount.incrementAndGet();
640+
return super.metadata(streams);
641+
}
642+
};
643+
try (Environment env =
644+
((StreamEnvironmentBuilder) environmentBuilder.lazyInitialization(lazyInit))
645+
.clientFactory(clientFactory)
646+
.build()) {
603647
assertThat(env.streamExists(stream)).isTrue();
604648
assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse();
649+
assertThat(metadataCallCount).hasValue(2);
605650
}
606651
}
607652

0 commit comments

Comments
 (0)