Skip to content

Commit a74aa8f

Browse files
committed
Document subscription listener
1 parent e9d7954 commit a74aa8f

File tree

3 files changed

+37
-294
lines changed

3 files changed

+37
-294
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,26 @@ include::{test-examples}/Api.java[tag=connection-settings]
2626
<1> Use the `guest` user by default
2727
<2> Use the `admin` user for this connection
2828

29+
=== Subscription Listener
30+
31+
The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
32+
This callback is meant for stream consumers: it can be used to dynamically set the offset the consumer attaches to in the stream.
33+
It is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection).
34+
35+
It is possible to use the callback to get the last processed offset from an external store.
36+
The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
37+
38+
.Using the subscription listener to attach to a stream
39+
[source,java,indent=0]
40+
--------
41+
include::{test-examples}/Api.java[tag=subscription-listener]
42+
--------
43+
<1> Set subscription listener
44+
<2> Get offset from external store
45+
<3> Set offset to use for the subscription
46+
<4> Get the message offset
47+
<5> Store the offset in the external store after processing
48+
2949
=== Metrics Collection
3050

3151
The library provides the {javadoc-url}/com/rabbitmq/client/amqp/metrics/MetricsCollector.html[`MetricsCollector`] abstraction to collect metrics.

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ interface SubscriptionListener {
190190
* <p>It is called before the link is created but also every time it recovers, e.g. after a
191191
* connection failure.
192192
*
193+
* <p>Configuration set with {@link Context#streamOptions()} overrides the one set with {@link
194+
* ConsumerBuilder#stream()}.
195+
*
193196
* @param context subscription context
194197
*/
195198
void preSubscribe(Context context);

src/test/java/com/rabbitmq/client/amqp/docs/Api.java

Lines changed: 14 additions & 294 deletions
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,8 @@
2626
import io.micrometer.prometheusmetrics.PrometheusConfig;
2727
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
2828

29-
import java.nio.charset.StandardCharsets;
30-
import java.time.Duration;
31-
32-
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
33-
import static com.rabbitmq.client.amqp.Publisher.Status.ACCEPTED;
34-
3529
class Api {
3630

37-
void environment() {
38-
// tag::environment-creation[]
39-
Environment environment = new AmqpEnvironmentBuilder()
40-
.build();
41-
// end::environment-creation[]
42-
}
43-
44-
void connection() {
45-
Environment environment = null;
46-
// tag::connection-creation[]
47-
Connection connection = environment.connectionBuilder()
48-
.build();
49-
// end::connection-creation[]
50-
}
51-
5231
void connectionSettings() {
5332
// tag::connection-settings[]
5433
Environment environment = new AmqpEnvironmentBuilder()
@@ -62,290 +41,31 @@ void connectionSettings() {
6241
// end::connection-settings[]
6342
}
6443

65-
66-
void publishing() {
67-
Connection connection = null;
68-
// tag::publisher-creation[]
69-
Publisher publisher = connection.publisherBuilder()
70-
.exchange("foo").key("bar")
71-
.build();
72-
// end::publisher-creation[]
73-
74-
75-
// tag::message-creation[]
76-
Message message = publisher
77-
.message("hello".getBytes(StandardCharsets.UTF_8))
78-
.messageId(1L);
79-
// end::message-creation[]
80-
81-
// tag::message-publishing[]
82-
publisher.publish(message, context -> {
83-
if (context.status() == ACCEPTED) {
84-
// the broker accepted (confirmed) the message
85-
} else {
86-
// deal with possible failure
87-
}
88-
});
89-
// end::message-publishing[]
90-
}
91-
92-
void targetAddressFormatExchangeKey() {
44+
void subscriptionListener() {
9345
Connection connection = null;
94-
// tag::target-address-exchange-key[]
95-
Publisher publisher = connection.publisherBuilder()
96-
.exchange("foo").key("bar") // <1>
97-
.build();
98-
// end::target-address-exchange-key[]
99-
}
100-
101-
void targetAddressFormatExchange() {
102-
Connection connection = null;
103-
// tag::target-address-exchange[]
104-
Publisher publisher = connection.publisherBuilder()
105-
.exchange("foo") // <1>
106-
.build();
107-
// end::target-address-exchange[]
108-
}
109-
110-
void targetAddressFormatQueue() {
111-
Connection connection = null;
112-
// tag::target-address-queue[]
113-
Publisher publisher = connection.publisherBuilder()
114-
.queue("some-queue") // <1>
115-
.build();
116-
// end::target-address-queue[]
117-
}
118-
119-
void targetAddressNull() {
120-
Connection connection = null;
121-
// tag::target-address-null[]
122-
Publisher publisher = connection.publisherBuilder()
123-
.build(); // <1>
124-
125-
Message message1 = publisher.message()
126-
.toAddress().exchange("foo").key("bar") // <2>
127-
.message();
128-
129-
Message message2 = publisher.message()
130-
.toAddress().exchange("foo") // <3>
131-
.message();
132-
133-
Message message3 = publisher.message()
134-
.toAddress().queue("my-queue") // <4>
135-
.message();
136-
// end::target-address-null[]
137-
}
138-
139-
void consuming() {
140-
Connection connection = null;
141-
// tag::consumer-consume[]
142-
Consumer consumer = connection.consumerBuilder()
143-
.queue("some-queue")
144-
.messageHandler((context, message) -> {
145-
byte[] body = message.body(); // <1>
146-
// ... <2>
147-
context.accept(); // <3>
148-
})
149-
.build();
150-
// end::consumer-consume[]
151-
152-
// tag::consumer-graceful-shutdown[]
153-
consumer.pause(); // <1>
154-
long unsettledMessageCount = consumer.unsettledMessageCount(); // <2>
155-
consumer.close(); // <3>
156-
// end::consumer-graceful-shutdown[]
157-
158-
// tag::consumer-abrupt-shutdown[]
159-
consumer.close(); // <1>
160-
// end::consumer-abrupt-shutdown[]
161-
}
162-
163-
void consumingStream() {
164-
Connection connection = null;
165-
// tag::consumer-consume-stream[]
166-
Consumer consumer = connection.consumerBuilder()
167-
.queue("some-stream")
168-
.stream() // <1>
169-
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST) // <2>
170-
.builder() // <3>
171-
.messageHandler((context, message) -> {
172-
// message processing
173-
})
174-
.build();
175-
// end::consumer-consume-stream[]
176-
}
177-
178-
void consumingStreamFiltering() {
179-
Connection connection = null;
180-
// tag::consumer-consume-stream-filtering[]
181-
Consumer consumer = connection.consumerBuilder()
46+
// tag::subscription-listener[]
47+
connection.consumerBuilder()
18248
.queue("some-stream")
183-
.stream() // <1>
184-
.filterValues("invoices", "orders") // <2>
185-
.filterMatchUnfiltered(true) // <3>
186-
.builder() // <4>
187-
.messageHandler((context, message) -> {
188-
// message processing
189-
})
190-
.build();
191-
// end::consumer-consume-stream-filtering[]
192-
}
193-
194-
void management() {
195-
Connection connection = null;
196-
// tag::management[]
197-
Management management = connection.management();
198-
// end::management[]
199-
}
200-
201-
void exchanges() {
202-
Management management = null;
203-
// tag::fanout-exchange[]
204-
management.exchange()
205-
.name("my-exchange")
206-
.type(FANOUT)
207-
.declare();
208-
// end::fanout-exchange[]
209-
210-
// tag::delayed-message-exchange[]
211-
management.exchange()
212-
.name("my-exchange")
213-
.type("x-delayed-message")
214-
.autoDelete(false)
215-
.argument("x-delayed-type", "direct")
216-
.declare();
217-
// end::delayed-message-exchange[]
218-
219-
// tag::delete-exchange[]
220-
management.exchangeDeletion().delete("my-exchange");
221-
// end::delete-exchange[]
222-
}
223-
224-
void queues() {
225-
Management management = null;
226-
// tag::queue-creation[]
227-
management.queue()
228-
.name("my-queue")
229-
.exclusive(true)
230-
.autoDelete(false)
231-
.declare();
232-
// end::queue-creation[]
233-
234-
// tag::queue-creation-with-arguments[]
235-
management
236-
.queue()
237-
.name("my-queue")
238-
.messageTtl(Duration.ofMinutes(10)) // <1>
239-
.maxLengthBytes(ByteCapacity.MB(100)) // <1>
240-
.declare();
241-
// end::queue-creation-with-arguments[]
242-
243-
// tag::quorum-queue-creation[]
244-
management
245-
.queue()
246-
.name("my-quorum-queue")
247-
.quorum() // <1>
248-
.quorumInitialGroupSize(3)
249-
.deliveryLimit(3)
250-
.queue()
251-
.declare();
252-
// end::quorum-queue-creation[]
253-
254-
// tag::queue-deletion[]
255-
management.queueDeletion().delete("my-queue");
256-
// end::queue-deletion[]
257-
}
258-
259-
void bindings() {
260-
Management management = null;
261-
// tag::binding[]
262-
management.binding()
263-
.sourceExchange("my-exchange")
264-
.destinationQueue("my-queue")
265-
.key("foo")
266-
.bind();
267-
// end::binding[]
268-
269-
// tag::exchange-binding[]
270-
management.binding()
271-
.sourceExchange("my-exchange")
272-
.destinationExchange("my-other-exchange")
273-
.key("foo")
274-
.bind();
275-
// end::exchange-binding[]
276-
277-
// tag::unbinding[]
278-
management.unbind()
279-
.sourceExchange("my-exchange")
280-
.destinationQueue("my-queue")
281-
.key("foo")
282-
.unbind();
283-
// end::unbinding[]
284-
}
285-
286-
void listeners() {
287-
Environment environment = null;
288-
// tag::listener-connection[]
289-
Connection connection = environment.connectionBuilder()
290-
.listeners(context -> { // <1>
291-
context.previousState(); // <2>
292-
context.currentState(); // <3>
293-
context.failureCause(); // <4>
294-
context.resource(); // <5>
295-
}).build();
296-
// end::listener-connection[]
297-
298-
// tag::listener-publisher[]
299-
Publisher publisher = connection.publisherBuilder()
300-
.listeners(context -> { // <1>
301-
// ...
49+
.subscriptionListener(ctx -> { // <1>
50+
long offset = getOffsetFromExternalStore(); // <2>
51+
ctx.streamOptions().offset(offset + 1); // <3>
30252
})
303-
.exchange("foo").key("bar")
304-
.build();
305-
// end::listener-publisher[]
53+
.messageHandler((ctx, msg) -> {
54+
// message handling code...
30655

307-
// tag::listener-consumer[]
308-
Consumer consumer = connection.consumerBuilder()
309-
.listeners(context -> { // <1>
310-
// ...
56+
long offset = (long) msg.annotation("x-stream-offset"); // <4>
57+
storeOffsetInExternalStore(offset); // <5>
31158
})
312-
.queue("my-queue")
31359
.build();
314-
// end::listener-consumer[]
60+
// end::subscription-listener[]
31561
}
31662

317-
void connectionRecoveryBackOff() {
318-
Environment environment = null;
319-
// tag::connection-recovery-back-off[]
320-
Connection connection = environment.connectionBuilder()
321-
.recovery() // <1>
322-
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2))) // <2>
323-
.connectionBuilder().build();
324-
// end::connection-recovery-back-off[]
63+
long getOffsetFromExternalStore() {
64+
return 0L;
32565
}
32666

327-
void connectionRecoveryNoTopologyRecovery() {
328-
Environment environment = null;
329-
// tag::connection-recovery-no-topology-recovery[]
330-
Connection connection = environment.connectionBuilder()
331-
.recovery()
332-
.topology(false) // <1>
333-
.connectionBuilder()
334-
.listeners(context -> {
335-
// <2>
336-
})
337-
.build();
338-
// end::connection-recovery-no-topology-recovery[]
339-
}
67+
void storeOffsetInExternalStore(long offset) {
34068

341-
void connectionRecoveryDeactivate() {
342-
Environment environment = null;
343-
// tag::connection-recovery-deactivate[]
344-
Connection connection = environment.connectionBuilder()
345-
.recovery()
346-
.activated(false) // <1>
347-
.connectionBuilder().build();
348-
// end::connection-recovery-deactivate[]
34969
}
35070

35171
void metricsCollectorMicrometerPrometheus() {

0 commit comments

Comments
 (0)