Skip to content

Commit b8fdac3

Browse files
committed
Add subscription listener
This consumer hook is called just before creating the receiver link. It allows changing some settings (only the stream-related ones for now). In the case of stream it can be used to look up the last processed offset and attach the stream consumer at this offset.
1 parent 295deea commit b8fdac3

File tree

7 files changed

+243
-9
lines changed

7 files changed

+243
-9
lines changed

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public interface ConsumerBuilder {
7474
*/
7575
StreamOptions stream();
7676

77+
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
78+
7779
/**
7880
* Build the consumer.
7981
*
@@ -164,4 +166,14 @@ enum StreamOffsetSpecification {
164166
/** Very end of the stream (new chunks). */
165167
NEXT
166168
}
169+
170+
interface SubscriptionListener {
171+
172+
void preSubscribe(Context context);
173+
174+
interface Context {
175+
176+
ConsumerBuilder.StreamOptions streamOptions();
177+
}
178+
}
167179
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
21+
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
2122
import static java.time.Duration.ofSeconds;
23+
import static java.util.Optional.ofNullable;
2224

2325
import com.rabbitmq.client.amqp.AmqpException;
2426
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
2527
import com.rabbitmq.client.amqp.Consumer;
28+
import com.rabbitmq.client.amqp.ConsumerBuilder;
2629
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
27-
import java.util.Collections;
28-
import java.util.List;
29-
import java.util.Map;
30+
import java.util.*;
3031
import java.util.concurrent.*;
3132
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicLong;
@@ -59,6 +60,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5960
private final String queue;
6061
private final Map<String, Object> filters;
6162
private final Map<String, Object> linkProperties;
63+
private final ConsumerBuilder.SubscriptionListener subscriptionListener;
6264
private final AmqpConnection connection;
6365
private final AtomicReference<PauseStatus> pauseStatus =
6466
new AtomicReference<>(PauseStatus.UNPAUSED);
@@ -89,11 +91,17 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
8991
this.queue = builder.queue();
9092
this.filters = Map.copyOf(builder.filters());
9193
this.linkProperties = Map.copyOf(builder.properties());
94+
this.subscriptionListener =
95+
ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
9296
this.connection = builder.connection();
9397
this.sessionHandler = this.connection.createSessionHandler();
9498
this.nativeReceiver =
9599
this.createNativeReceiver(
96-
this.sessionHandler.session(), this.address, this.linkProperties, this.filters);
100+
this.sessionHandler.session(),
101+
this.address,
102+
this.linkProperties,
103+
this.filters,
104+
this.subscriptionListener);
97105
this.initStateFromNativeReceiver(this.nativeReceiver);
98106
this.metricsCollector = this.connection.metricsCollector();
99107
this.startReceivingLoop();
@@ -153,8 +161,12 @@ private ClientReceiver createNativeReceiver(
153161
Session nativeSession,
154162
String address,
155163
Map<String, Object> properties,
156-
Map<String, Object> filters) {
164+
Map<String, Object> filters,
165+
SubscriptionListener subscriptionListener) {
157166
try {
167+
filters = new LinkedHashMap<>(filters);
168+
StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
169+
subscriptionListener.preSubscribe(() -> streamOptions);
158170
ReceiverOptions receiverOptions =
159171
new ReceiverOptions()
160172
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
@@ -221,7 +233,8 @@ void recoverAfterConnectionFailure() {
221233
this.sessionHandler.sessionNoCheck(),
222234
this.address,
223235
this.linkProperties,
224-
this.filters),
236+
this.filters,
237+
this.subscriptionListener),
225238
e -> {
226239
boolean shouldRetry =
227240
e instanceof AmqpException.AmqpResourceClosedException

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
class AmqpConsumerBuilder implements ConsumerBuilder {
2929

30+
static SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = ctx -> {};
31+
3032
private final AmqpConnection connection;
3133
private String queue;
3234
private Consumer.MessageHandler messageHandler;
@@ -35,6 +37,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3537
private final Map<String, Object> filters = new LinkedHashMap<>();
3638
private final Map<String, Object> properties = new LinkedHashMap<>();
3739
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
40+
private SubscriptionListener subscriptionListener = NO_OP_SUBSCRIPTION_LISTENER;
3841

3942
AmqpConsumerBuilder(AmqpConnection connection) {
4043
this.connection = connection;
@@ -79,6 +82,16 @@ public StreamOptions stream() {
7982
return this.streamOptions;
8083
}
8184

85+
@Override
86+
public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) {
87+
this.subscriptionListener = subscriptionListener;
88+
return this;
89+
}
90+
91+
SubscriptionListener subscriptionListener() {
92+
return this.subscriptionListener;
93+
}
94+
8295
AmqpConnection connection() {
8396
return connection;
8497
}
@@ -186,4 +199,8 @@ private void offsetSpecification(Object value) {
186199
this.filters.put("rabbitmq:stream-offset-spec", value);
187200
}
188201
}
202+
203+
static StreamOptions streamOptions(Map<String, Object> filters) {
204+
return new DefaultStreamOptions(null, filters);
205+
}
189206
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
21+
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
22+
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
23+
import static com.rabbitmq.client.amqp.impl.Assertions.*;
24+
import static com.rabbitmq.client.amqp.impl.Cli.closeConnection;
25+
import static com.rabbitmq.client.amqp.impl.TestUtils.name;
26+
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
27+
import static org.assertj.core.api.Assertions.*;
28+
29+
import com.rabbitmq.client.amqp.*;
30+
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
31+
import java.time.Duration;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
import java.util.stream.IntStream;
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
import org.junit.jupiter.api.TestInfo;
39+
import org.junit.jupiter.api.extension.ExtendWith;
40+
41+
@ExtendWith(AmqpTestInfrastructureExtension.class)
42+
public class AmqpConsumerTest {
43+
44+
BackOffDelayPolicy backOffDelayPolicy = BackOffDelayPolicy.fixed(Duration.ofMillis(100));
45+
Environment environment;
46+
Connection connection;
47+
String q;
48+
String connectionName;
49+
50+
@BeforeEach
51+
void init(TestInfo info) {
52+
this.q = name(info);
53+
connection.management().queue(this.q).type(STREAM).declare();
54+
this.connectionName = ((AmqpConnection) connection).name();
55+
}
56+
57+
@AfterEach
58+
void tearDown() {
59+
connection.management().queueDeletion().delete(this.q);
60+
}
61+
62+
@Test
63+
void subscriptionListenerShouldBeCalledOnRecovery() {
64+
Sync subscriptionSync = sync();
65+
Sync recoveredSync = sync();
66+
connection
67+
.consumerBuilder()
68+
.queue(this.q)
69+
.subscriptionListener(ctx -> subscriptionSync.down())
70+
.listeners(recoveredListener(recoveredSync))
71+
.messageHandler((ctx, msg) -> {})
72+
.build();
73+
74+
assertThat(subscriptionSync).completes();
75+
assertThat(recoveredSync).hasNotCompleted();
76+
sync().reset();
77+
closeConnection(this.connectionName);
78+
assertThat(recoveredSync).completes();
79+
assertThat(subscriptionSync).completes();
80+
}
81+
82+
@Test
83+
void streamConsumerRestartsWhereItLeftOff() {
84+
Connection publisherConnection = environment.connectionBuilder().build();
85+
Publisher publisher = publisherConnection.publisherBuilder().queue(this.q).build();
86+
int messageCount = 100;
87+
Runnable publish =
88+
() -> {
89+
Sync publishSync = sync(messageCount);
90+
Publisher.Callback callback = ctx -> publishSync.down();
91+
IntStream.range(0, messageCount)
92+
.forEach(
93+
ignored -> {
94+
publisher.publish(publisher.message(), callback);
95+
});
96+
assertThat(publishSync).completes();
97+
};
98+
99+
publish.run();
100+
101+
Sync consumeSync = sync(messageCount);
102+
AtomicLong lastOffsetProcessed = new AtomicLong(-1);
103+
AtomicInteger consumedMessageCount = new AtomicInteger(0);
104+
AtomicInteger subscriptionListenerCallCount = new AtomicInteger(0);
105+
Sync recoveredSync = sync();
106+
ConsumerBuilder.SubscriptionListener subscriptionListener =
107+
ctx -> {
108+
subscriptionListenerCallCount.incrementAndGet();
109+
ctx.streamOptions().offset(lastOffsetProcessed.get() + 1);
110+
};
111+
Consumer.MessageHandler messageHandler =
112+
(ctx, msg) -> {
113+
long offset = (long) msg.annotation("x-stream-offset");
114+
ctx.accept();
115+
lastOffsetProcessed.set(offset);
116+
consumedMessageCount.incrementAndGet();
117+
consumeSync.down();
118+
};
119+
Consumer consumer =
120+
connection
121+
.consumerBuilder()
122+
.listeners(recoveredListener(recoveredSync))
123+
.queue(this.q)
124+
.subscriptionListener(subscriptionListener)
125+
.messageHandler(messageHandler)
126+
.build();
127+
128+
assertThat(subscriptionListenerCallCount).hasValue(1);
129+
assertThat(consumeSync).completes();
130+
131+
closeConnection(this.connectionName);
132+
assertThat(recoveredSync).completes();
133+
assertThat(subscriptionListenerCallCount).hasValue(2);
134+
assertThat(consumedMessageCount).hasValue(messageCount);
135+
136+
long offsetAfterRecovery = lastOffsetProcessed.get();
137+
consumeSync.reset(messageCount);
138+
publish.run();
139+
assertThat(consumeSync).completes();
140+
assertThat(consumedMessageCount).hasValue(messageCount * 2);
141+
assertThat(lastOffsetProcessed).hasValueGreaterThan(offsetAfterRecovery);
142+
143+
consumer.close();
144+
145+
long offsetAfterClosing = lastOffsetProcessed.get();
146+
consumeSync.reset(messageCount);
147+
publish.run();
148+
149+
connection
150+
.consumerBuilder()
151+
.queue(this.q)
152+
.subscriptionListener(subscriptionListener)
153+
.messageHandler(messageHandler)
154+
.build();
155+
156+
assertThat(subscriptionListenerCallCount).hasValue(3);
157+
assertThat(consumeSync).completes();
158+
assertThat(consumedMessageCount).hasValue(messageCount * 3);
159+
assertThat(lastOffsetProcessed).hasValueGreaterThan(offsetAfterClosing);
160+
}
161+
162+
private static Resource.StateListener recoveredListener(Sync sync) {
163+
return context -> {
164+
if (context.previousState() == RECOVERING && context.currentState() == OPEN) {
165+
sync.down();
166+
}
167+
};
168+
}
169+
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.impl.TestUtils.name;
21+
22+
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
2023
import com.rabbitmq.client.amqp.Connection;
2124
import com.rabbitmq.client.amqp.Environment;
2225
import java.lang.reflect.Field;
@@ -60,9 +63,18 @@ public void beforeEach(ExtensionContext context) throws Exception {
6063

6164
Field connectionField = field(context.getTestClass().get(), "connection");
6265
if (connectionField != null) {
66+
AmqpConnectionBuilder connectionBuilder = (AmqpConnectionBuilder) env.connectionBuilder();
67+
Field backOffDelayPolicyField = field(context.getTestClass().get(), "backOffDelayPolicy");
68+
if (backOffDelayPolicyField != null) {
69+
backOffDelayPolicyField.setAccessible(true);
70+
BackOffDelayPolicy backOffDelayPolicy =
71+
(BackOffDelayPolicy) backOffDelayPolicyField.get(context.getTestInstance().get());
72+
if (backOffDelayPolicy != null) {
73+
connectionBuilder.recovery().backOffDelayPolicy(backOffDelayPolicy);
74+
}
75+
}
76+
Connection connection = connectionBuilder.name(name(context)).build();
6377
connectionField.setAccessible(true);
64-
Connection connection =
65-
((AmqpConnectionBuilder) env.connectionBuilder()).name(TestUtils.name(context)).build();
6678
connectionField.set(context.getTestInstance().get(), connection);
6779
store(context).put("connection", connection);
6880
}
@@ -77,7 +89,7 @@ public void afterEach(ExtensionContext context) {
7789
}
7890

7991
@Override
80-
public void afterAll(ExtensionContext context) throws Exception {
92+
public void afterAll(ExtensionContext context) {
8193
Environment env = store(context).get("environment", Environment.class);
8294
if (env != null) {
8395
env.close();

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ SyncAssert completes(Duration timeout) {
9999
}
100100
return this;
101101
}
102+
103+
SyncAssert hasNotCompleted() {
104+
if (actual.hasCompleted()) {
105+
fail("Sync '%s' should not have completed", this.actual.toString());
106+
}
107+
return this;
108+
}
102109
}
103110

104111
static class QueueInfoAssert extends AbstractObjectAssert<QueueInfoAssert, Management.QueueInfo> {

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,10 @@ void reset() {
525525
this.reset(1);
526526
}
527527

528+
boolean hasCompleted() {
529+
return this.latch.get().getCount() == 0;
530+
}
531+
528532
@Override
529533
public String toString() {
530534
return this.description;

0 commit comments

Comments
 (0)