Skip to content

Commit 723026e

Browse files
committed
Support stream offset specification in consumer builder
1 parent fd3322f commit 723026e

File tree

5 files changed

+378
-15
lines changed

5 files changed

+378
-15
lines changed

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.time.Instant;
21+
2022
public interface ConsumerBuilder {
2123

2224
ConsumerBuilder queue(String queue);
@@ -27,5 +29,30 @@ public interface ConsumerBuilder {
2729

2830
ConsumerBuilder listeners(Resource.StateListener... listeners);
2931

32+
StreamOptions stream();
33+
3034
Consumer build();
35+
36+
interface StreamOptions {
37+
38+
StreamOptions offset(long offset);
39+
40+
StreamOptions offset(Instant timestamp);
41+
42+
StreamOptions offset(StreamOffsetSpecification specification);
43+
44+
StreamOptions offset(String interval);
45+
46+
StreamOptions filterValues(String... values);
47+
48+
StreamOptions filterMatchUnfiltered(boolean matchUnfiltered);
49+
50+
ConsumerBuilder builder();
51+
}
52+
53+
enum StreamOffsetSpecification {
54+
FIRST,
55+
LAST,
56+
NEXT
57+
}
3158
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.rabbitmq.client.amqp.AmqpException;
2323
import com.rabbitmq.client.amqp.Consumer;
2424
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
25+
import java.util.Collections;
26+
import java.util.Map;
2527
import java.util.concurrent.*;
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729
import java.util.concurrent.atomic.AtomicLong;
@@ -51,6 +53,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5153
private final MessageHandler messageHandler;
5254
private final Long id;
5355
private final String address;
56+
private final Map<String, Object> filters;
5457
private final AmqpConnection connection;
5558
private final AtomicReference<PauseStatus> pauseStatus =
5659
new AtomicReference<>(PauseStatus.UNPAUSED);
@@ -76,9 +79,11 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7679
.observationCollector()
7780
.subscribe(builder.queue(), builder.messageHandler());
7881
this.address = "/queue/" + builder.queue();
82+
this.filters = Collections.unmodifiableMap(builder.filters());
7983
this.connection = builder.connection();
8084
this.sessionHandler = this.connection.createSessionHandler();
81-
this.nativeReceiver = this.createNativeReceiver(this.sessionHandler.session(), this.address);
85+
this.nativeReceiver =
86+
this.createNativeReceiver(this.sessionHandler.session(), this.address, this.filters);
8287
this.initStateFromNativeReceiver(this.nativeReceiver);
8388
this.metricsCollector = this.connection.metricsCollector();
8489
this.startReceivingLoop();
@@ -134,19 +139,20 @@ public void close() {
134139

135140
// internal API
136141

137-
private ClientReceiver createNativeReceiver(Session nativeSession, String address) {
142+
private ClientReceiver createNativeReceiver(
143+
Session nativeSession, String address, Map<String, Object> filters) {
138144
try {
145+
ReceiverOptions receiverOptions =
146+
new ReceiverOptions()
147+
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
148+
.autoAccept(false)
149+
.autoSettle(false)
150+
.creditWindow(0);
151+
if (!filters.isEmpty()) {
152+
receiverOptions.sourceOptions().filters(filters);
153+
}
139154
return (ClientReceiver)
140-
ExceptionUtils.wrapGet(
141-
nativeSession
142-
.openReceiver(
143-
address,
144-
new ReceiverOptions()
145-
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
146-
.autoAccept(false)
147-
.autoSettle(false)
148-
.creditWindow(0))
149-
.openFuture());
155+
ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture());
150156
} catch (ClientException e) {
151157
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);
152158
}
@@ -194,7 +200,8 @@ private void startReceivingLoop() {
194200
}
195201

196202
void recoverAfterConnectionFailure() {
197-
this.nativeReceiver = createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address);
203+
this.nativeReceiver =
204+
createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address, this.filters);
198205
this.initStateFromNativeReceiver(this.nativeReceiver);
199206
this.pauseStatus.set(PauseStatus.UNPAUSED);
200207
this.unsettledCount.set(0);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.impl.Assert.notNull;
21+
2022
import com.rabbitmq.client.amqp.Consumer;
2123
import com.rabbitmq.client.amqp.ConsumerBuilder;
2224
import com.rabbitmq.client.amqp.Resource;
23-
import java.util.ArrayList;
24-
import java.util.List;
25+
import java.time.Instant;
26+
import java.util.*;
2527

2628
class AmqpConsumerBuilder implements ConsumerBuilder {
2729

@@ -30,6 +32,8 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3032
private Consumer.MessageHandler messageHandler;
3133
private int initialCredits = 100;
3234
private final List<Resource.StateListener> listeners = new ArrayList<>();
35+
private final Map<String, Object> filters = new HashMap<>();
36+
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
3337

3438
AmqpConsumerBuilder(AmqpConnection connection) {
3539
this.connection = connection;
@@ -63,6 +67,11 @@ public ConsumerBuilder listeners(Resource.StateListener... listeners) {
6367
return this;
6468
}
6569

70+
@Override
71+
public StreamOptions stream() {
72+
return this.streamOptions;
73+
}
74+
6675
AmqpConnection connection() {
6776
return connection;
6877
}
@@ -83,6 +92,10 @@ List<Resource.StateListener> listeners() {
8392
return listeners;
8493
}
8594

95+
Map<String, Object> filters() {
96+
return this.filters;
97+
}
98+
8699
@Override
87100
public Consumer build() {
88101
if (this.queue == null || this.queue.isBlank()) {
@@ -91,6 +104,68 @@ public Consumer build() {
91104
if (this.messageHandler == null) {
92105
throw new IllegalArgumentException("Message handler cannot be null");
93106
}
107+
108+
// TODO validate stream (filtering) configuration
109+
94110
return this.connection.createConsumer(this);
95111
}
112+
113+
private static class DefaultStreamOptions implements StreamOptions {
114+
115+
private final Map<String, Object> filters;
116+
private final ConsumerBuilder builder;
117+
118+
private DefaultStreamOptions(ConsumerBuilder builder, Map<String, Object> filters) {
119+
this.builder = builder;
120+
this.filters = filters;
121+
}
122+
123+
@Override
124+
public StreamOptions offset(long offset) {
125+
this.filters.put("rabbitmq:stream-offset-spec", offset);
126+
return this;
127+
}
128+
129+
@Override
130+
public StreamOptions offset(Instant timestamp) {
131+
notNull(timestamp, "Timestamp offset cannot be null");
132+
this.offsetSpecification(Date.from(timestamp));
133+
return this;
134+
}
135+
136+
@Override
137+
public StreamOptions offset(StreamOffsetSpecification specification) {
138+
notNull(specification, "Offset specification cannot be null");
139+
this.offsetSpecification(specification.name().toLowerCase(Locale.ENGLISH));
140+
return this;
141+
}
142+
143+
@Override
144+
public StreamOptions offset(String interval) {
145+
notNull(interval, "Interval offset cannot be null");
146+
this.offsetSpecification(interval);
147+
return this;
148+
}
149+
150+
@Override
151+
public StreamOptions filterValues(String... values) {
152+
this.filters.put("rabbitmq:stream-filter", values);
153+
return this;
154+
}
155+
156+
@Override
157+
public StreamOptions filterMatchUnfiltered(boolean matchUnfiltered) {
158+
this.filters.put("rabbitmq:stream-match-unfiltered", matchUnfiltered);
159+
return this;
160+
}
161+
162+
@Override
163+
public ConsumerBuilder builder() {
164+
return this.builder;
165+
}
166+
167+
private void offsetSpecification(Object value) {
168+
this.filters.put("rabbitmq:stream-offset-spec", value);
169+
}
170+
}
96171
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
abstract class Assert {
21+
22+
private Assert() {}
23+
24+
static void notNull(Object object, String message) {
25+
if (object == null) {
26+
throw new IllegalArgumentException(message);
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)