Skip to content

Commit f9baff1

Browse files
committed
Document stream support in consumer configuration
1 parent 81f7707 commit f9baff1

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ include::{test-examples}/Api.java[tag=consumer-consume]
102102
<2> Process the message
103103
<3> Accept (acknowledge) the message
104104

105+
==== Graceful Shutdown
106+
105107
A consumer can accept, discard, or requeue a message.
106108
We say the consumer _settles_ a message.
107109

@@ -138,6 +140,35 @@ include::{test-examples}/Api.java[tag=consumer-abrupt-shutdown]
138140
--------
139141
<1> Close the consumer with potential unsettled messages
140142

143+
==== Support for Streams
144+
145+
There is out-of-the-box support for https://www.rabbitmq.com/docs/streams[streams] in consumer configuration.
146+
147+
It is possible to set where to attach to when https://www.rabbitmq.com/docs/streams#consuming[consuming] from a stream:
148+
149+
.Attaching to the beginning of a stream
150+
[source,java,indent=0]
151+
--------
152+
include::{test-examples}/Api.java[tag=consumer-consume-stream]
153+
--------
154+
<1> Access to stream-specific configuration helper
155+
<2> Attach to the first offset in the stream
156+
<3> Go back to main consumer configuration
157+
158+
There is also support for https://www.rabbitmq.com/docs/streams#filtering[stream filtering] configuration:
159+
160+
.Configuring stream filtering
161+
[source,java,indent=0]
162+
--------
163+
include::{test-examples}/Api.java[tag=consumer-consume-stream-filtering]
164+
--------
165+
<1> Access to stream-specific configuration helper
166+
<2> Set one or several filter values
167+
<3> Get "unfiltered" messages as well
168+
<4> Go back to main consumer configuration
169+
170+
Consider also using the https://www.rabbitmq.com/docs/stream[native stream protocol] with the RabbitMQ https://github.com/rabbitmq/rabbitmq-stream-java-client[stream Java client] when working with streams.
171+
141172
=== Resource Management
142173

143174
The `Management` object is the entry point to deal with resources.

src/test/java/com/rabbitmq/client/amqp/docs/Api.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,37 @@ void consuming() {
160160
// end::consumer-abrupt-shutdown[]
161161
}
162162

163+
void consumingStream() {
164+
Connection connection = null;
165+
// tag::consumer-consume-stream[]
166+
Consumer consumer = connection.consumerBuilder()
167+
.queue("some-stream")
168+
.stream() // <1>
169+
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST) // <2>
170+
.builder() // <3>
171+
.messageHandler((context, message) -> {
172+
// message processing
173+
})
174+
.build();
175+
// end::consumer-consume-stream[]
176+
}
177+
178+
void consumingStreamFiltering() {
179+
Connection connection = null;
180+
// tag::consumer-consume-stream-filtering[]
181+
Consumer consumer = connection.consumerBuilder()
182+
.queue("some-stream")
183+
.stream() // <1>
184+
.filterValues("invoices", "orders") // <2>
185+
.filterMatchUnfiltered(true) // <3>
186+
.builder() // <4>
187+
.messageHandler((context, message) -> {
188+
// message processing
189+
})
190+
.build();
191+
// end::consumer-consume-stream-filtering[]
192+
}
193+
163194
void management() {
164195
Connection connection = null;
165196
// tag::management[]

0 commit comments

Comments
 (0)