Skip to content

Commit d4c53d0

Browse files
authored
Merge pull request #63 from rabbitmq/consumer-subscription-listener
Add subscription listener
2 parents 295deea + a74aa8f commit d4c53d0

File tree

9 files changed

+313
-303
lines changed

9 files changed

+313
-303
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,26 @@ include::{test-examples}/Api.java[tag=connection-settings]
2626
<1> Use the `guest` user by default
2727
<2> Use the `admin` user for this connection
2828

29+
=== Subscription Listener
30+
31+
The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
32+
This callback is meant for stream consumers: it can be used to dynamically set the offset the consumer attaches to in the stream.
33+
It is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection).
34+
35+
It is possible to use the callback to get the last processed offset from an external store.
36+
The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
37+
38+
.Using the subscription listener to attach to a stream
39+
[source,java,indent=0]
40+
--------
41+
include::{test-examples}/Api.java[tag=subscription-listener]
42+
--------
43+
<1> Set subscription listener
44+
<2> Get offset from external store
45+
<3> Set offset to use for the subscription
46+
<4> Get the message offset
47+
<5> Store the offset in the external store after processing
48+
2949
=== Metrics Collection
3050

3151
The library provides the {javadoc-url}/com/rabbitmq/client/amqp/metrics/MetricsCollector.html[`MetricsCollector`] abstraction to collect metrics.

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ public interface ConsumerBuilder {
7474
*/
7575
StreamOptions stream();
7676

77+
/**
78+
* Set a listener to customize the subscription before the consumer is created (or recovered).
79+
*
80+
* <p>This callback is available for stream consumers.
81+
*
82+
* @param subscriptionListener subscription listener
83+
* @return this builder instance
84+
* @see SubscriptionListener
85+
*/
86+
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
87+
7788
/**
7889
* Build the consumer.
7990
*
@@ -164,4 +175,41 @@ enum StreamOffsetSpecification {
164175
/** Very end of the stream (new chunks). */
165176
NEXT
166177
}
178+
179+
/**
180+
* Callback to modify a consumer subscription before the link creation.
181+
*
182+
* <p>This allows looking up the last processed offset for a stream consumer and attaching to this
183+
* offset.
184+
*/
185+
interface SubscriptionListener {
186+
187+
/**
188+
* Pre-subscription callback.
189+
*
190+
* <p>It is called before the link is created but also every time it recovers, e.g. after a
191+
* connection failure.
192+
*
193+
* <p>Configuration set with {@link Context#streamOptions()} overrides the one set with {@link
194+
* ConsumerBuilder#stream()}.
195+
*
196+
* @param context subscription context
197+
*/
198+
void preSubscribe(Context context);
199+
200+
/** Subscription context. */
201+
interface Context {
202+
203+
/**
204+
* Stream options, to set the offset to start consuming from.
205+
*
206+
* <p>Only the {@link StreamOptions} are accessible, the {@link StreamOptions#builder()}
207+
* method returns <code>null</code>
208+
*
209+
* @return the stream options
210+
* @see StreamOptions
211+
*/
212+
ConsumerBuilder.StreamOptions streamOptions();
213+
}
214+
}
167215
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
21+
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
2122
import static java.time.Duration.ofSeconds;
23+
import static java.util.Optional.ofNullable;
2224

2325
import com.rabbitmq.client.amqp.AmqpException;
2426
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
2527
import com.rabbitmq.client.amqp.Consumer;
28+
import com.rabbitmq.client.amqp.ConsumerBuilder;
2629
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
27-
import java.util.Collections;
28-
import java.util.List;
29-
import java.util.Map;
30+
import java.util.*;
3031
import java.util.concurrent.*;
3132
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicLong;
@@ -59,6 +60,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5960
private final String queue;
6061
private final Map<String, Object> filters;
6162
private final Map<String, Object> linkProperties;
63+
private final ConsumerBuilder.SubscriptionListener subscriptionListener;
6264
private final AmqpConnection connection;
6365
private final AtomicReference<PauseStatus> pauseStatus =
6466
new AtomicReference<>(PauseStatus.UNPAUSED);
@@ -89,11 +91,17 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
8991
this.queue = builder.queue();
9092
this.filters = Map.copyOf(builder.filters());
9193
this.linkProperties = Map.copyOf(builder.properties());
94+
this.subscriptionListener =
95+
ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
9296
this.connection = builder.connection();
9397
this.sessionHandler = this.connection.createSessionHandler();
9498
this.nativeReceiver =
9599
this.createNativeReceiver(
96-
this.sessionHandler.session(), this.address, this.linkProperties, this.filters);
100+
this.sessionHandler.session(),
101+
this.address,
102+
this.linkProperties,
103+
this.filters,
104+
this.subscriptionListener);
97105
this.initStateFromNativeReceiver(this.nativeReceiver);
98106
this.metricsCollector = this.connection.metricsCollector();
99107
this.startReceivingLoop();
@@ -153,8 +161,12 @@ private ClientReceiver createNativeReceiver(
153161
Session nativeSession,
154162
String address,
155163
Map<String, Object> properties,
156-
Map<String, Object> filters) {
164+
Map<String, Object> filters,
165+
SubscriptionListener subscriptionListener) {
157166
try {
167+
filters = new LinkedHashMap<>(filters);
168+
StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
169+
subscriptionListener.preSubscribe(() -> streamOptions);
158170
ReceiverOptions receiverOptions =
159171
new ReceiverOptions()
160172
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
@@ -221,7 +233,8 @@ void recoverAfterConnectionFailure() {
221233
this.sessionHandler.sessionNoCheck(),
222234
this.address,
223235
this.linkProperties,
224-
this.filters),
236+
this.filters,
237+
this.subscriptionListener),
225238
e -> {
226239
boolean shouldRetry =
227240
e instanceof AmqpException.AmqpResourceClosedException

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
class AmqpConsumerBuilder implements ConsumerBuilder {
2929

30+
static SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = ctx -> {};
31+
3032
private final AmqpConnection connection;
3133
private String queue;
3234
private Consumer.MessageHandler messageHandler;
@@ -35,6 +37,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3537
private final Map<String, Object> filters = new LinkedHashMap<>();
3638
private final Map<String, Object> properties = new LinkedHashMap<>();
3739
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
40+
private SubscriptionListener subscriptionListener = NO_OP_SUBSCRIPTION_LISTENER;
3841

3942
AmqpConsumerBuilder(AmqpConnection connection) {
4043
this.connection = connection;
@@ -79,6 +82,16 @@ public StreamOptions stream() {
7982
return this.streamOptions;
8083
}
8184

85+
@Override
86+
public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) {
87+
this.subscriptionListener = subscriptionListener;
88+
return this;
89+
}
90+
91+
SubscriptionListener subscriptionListener() {
92+
return this.subscriptionListener;
93+
}
94+
8295
AmqpConnection connection() {
8396
return connection;
8497
}
@@ -186,4 +199,8 @@ private void offsetSpecification(Object value) {
186199
this.filters.put("rabbitmq:stream-offset-spec", value);
187200
}
188201
}
202+
203+
static StreamOptions streamOptions(Map<String, Object> filters) {
204+
return new DefaultStreamOptions(null, filters);
205+
}
189206
}

0 commit comments

Comments
 (0)