Skip to content

Commit 1ae1ded

Browse files
committed
WIP, will force push later
1 parent a499214 commit 1ae1ded

13 files changed

+376
-152
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.Objects;
6+
import java.util.function.Supplier;
7+
8+
public class AbstractFlowControlStrategy implements ConsumerFlowControlStrategy {
9+
10+
private Supplier<Client> clientSupplier;
11+
private volatile Client client;
12+
13+
public AbstractFlowControlStrategy(Supplier<Client> clientSupplier) {
14+
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier");
15+
}
16+
17+
protected Client mandatoryClient() {
18+
Client localClient = this.client;
19+
if(localClient != null) {
20+
return localClient;
21+
}
22+
localClient = clientSupplier.get();
23+
if(localClient == null) {
24+
throw new IllegalStateException("Requested client, but client is not yet available! Supplier: " + this.clientSupplier);
25+
}
26+
this.client = localClient;
27+
return localClient;
28+
}
29+
30+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
/**
6+
* Exposes callbacks to handle events from a specific Client, with a leaner interface.
7+
*/
8+
public interface ClientDataHandler extends
9+
Client.PublishConfirmListener,
10+
Client.PublishErrorListener,
11+
Client.ChunkListener,
12+
Client.MessageListener,
13+
Client.CreditNotification,
14+
Client.ConsumerUpdateListener,
15+
Client.ShutdownListener,
16+
Client.MetadataListener {
17+
18+
@Override
19+
default void handle(byte publisherId, long publishingId) {
20+
this.handlePublishConfirm(publisherId, publishingId);
21+
}
22+
23+
default void handlePublishConfirm(byte publisherId, long publishingId) {
24+
// No-op by default
25+
}
26+
27+
@Override
28+
default void handle(byte publisherId, long publishingId, short errorCode) {
29+
this.handlePublishError(publisherId, publishingId, errorCode);
30+
}
31+
32+
default void handlePublishError(byte publisherId, long publishingId, short errorCode) {
33+
// No-op by default
34+
}
35+
36+
@Override
37+
default void handle(Client client, byte subscriptionId, long offset, long messageCount, long dataSize) {
38+
this.handleChunk(subscriptionId, offset, messageCount, dataSize);
39+
}
40+
41+
default void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
42+
// No-op by default
43+
}
44+
45+
@Override
46+
default void handle(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) {
47+
this.handleMessage(subscriptionId, offset, chunkTimestamp, committedChunkId, message);
48+
}
49+
50+
default void handleMessage(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) {
51+
// No-op by default
52+
}
53+
54+
@Override
55+
default void handle(byte subscriptionId, short responseCode) {
56+
this.handleCreditNotification(subscriptionId, responseCode);
57+
}
58+
59+
default void handleCreditNotification(byte subscriptionId, short responseCode) {
60+
// No-op by default
61+
}
62+
63+
@Override
64+
default OffsetSpecification handle(Client client, byte subscriptionId, boolean active) {
65+
this.handleConsumerUpdate(subscriptionId, active);
66+
return null;
67+
}
68+
69+
default void handleConsumerUpdate(byte subscriptionId, boolean active) {
70+
// No-op by default
71+
}
72+
73+
@Override
74+
default void handle(Client.ShutdownContext shutdownContext) {
75+
this.handleShutdown(shutdownContext);
76+
}
77+
78+
default void handleShutdown(Client.ShutdownContext shutdownContext) {
79+
// No-op by default
80+
}
81+
82+
@Override
83+
default void handle(String stream, short code) {
84+
this.handleMetadata(stream, code);
85+
}
86+
87+
default void handleMetadata(String stream, short code) {
88+
// No-op by default
89+
}
90+
91+
}

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public interface ConsumerBuilder {
5959
*/
6060
ConsumerBuilder messageHandler(MessageHandler messageHandler);
6161

62+
/**
63+
* Factory for the flow control strategy to be used when consuming messages.
64+
* @param consumerFlowControlStrategyBuilderFactory the factory
65+
* @return a fluent configurable builder for the flow control strategy
66+
* @param <T>
67+
*/
68+
<T extends ConsumerFlowControlStrategyBuilder<?>> T flowControlStrategy(ConsumerFlowControlStrategyBuilderFactory<?, T> consumerFlowControlStrategyBuilderFactory);
69+
6270
/**
6371
* The logical name of the {@link Consumer}.
6472
*
@@ -159,7 +167,7 @@ public interface ConsumerBuilder {
159167
Consumer build();
160168

161169
/** Manual tracking strategy. */
162-
interface ManualTrackingStrategy {
170+
interface ManualTrackingStrategy extends ConsumerBuilderAccessor {
163171

164172
/**
165173
* Interval to check if the last requested stored offset has been actually stored.
@@ -170,17 +178,10 @@ interface ManualTrackingStrategy {
170178
* @return the manual tracking strategy
171179
*/
172180
ManualTrackingStrategy checkInterval(Duration checkInterval);
173-
174-
/**
175-
* Go back to the builder.
176-
*
177-
* @return the consumer builder
178-
*/
179-
ConsumerBuilder builder();
180181
}
181182

182183
/** Auto-tracking strategy. */
183-
interface AutoTrackingStrategy {
184+
interface AutoTrackingStrategy extends ConsumerBuilderAccessor {
184185

185186
/**
186187
* Number of messages before storing.
@@ -201,12 +202,15 @@ interface AutoTrackingStrategy {
201202
* @return the auto-tracking strategy
202203
*/
203204
AutoTrackingStrategy flushInterval(Duration flushInterval);
205+
}
204206

207+
interface ConsumerBuilderAccessor {
205208
/**
206209
* Go back to the builder.
207210
*
208211
* @return the consumer builder
209212
*/
210213
ConsumerBuilder builder();
211214
}
215+
212216
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
/**
6+
* A built and configured flow control strategy for consumers.
7+
* Implementations may freely implement reactions to the various client callbacks.
8+
* When defined by each implementation, it may internally call {@link Client#credit} to ask for credits.
9+
*/
10+
public interface ConsumerFlowControlStrategy extends ClientDataHandler, AutoCloseable {
11+
12+
default void handleShutdown(Client.ShutdownContext shutdownContext) {
13+
this.close();
14+
}
15+
16+
@Override
17+
default void close() {
18+
// Override with cleanup logic, if applicable
19+
}
20+
21+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.function.Supplier;
6+
7+
/**
8+
* Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}.
9+
*
10+
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
11+
*/
12+
public interface ConsumerFlowControlStrategyBuilder<T extends ConsumerFlowControlStrategy> extends ConsumerBuilder.ConsumerBuilderAccessor {
13+
/**
14+
* Builds the actual FlowControlStrategy instance, for the Client with which it interoperates
15+
*
16+
* @param clientSupplier {@link Supplier <Client>} for retrieving the {@link Client}.
17+
* Is not a {@link Client} instance because the {@link Client} may be lazily initialized.
18+
* @return the FlowControlStrategy
19+
*/
20+
T build(Supplier<Client> clientSupplier);
21+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.rabbitmq.stream;
2+
3+
/**
4+
* A strategy for regulating consumer flow when consuming from a RabbitMQ Stream.
5+
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
6+
* @param <C> the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder}
7+
*/
8+
public interface ConsumerFlowControlStrategyBuilderFactory<T extends ConsumerFlowControlStrategy, C extends ConsumerFlowControlStrategyBuilder<T>> {
9+
/**
10+
* Accessor for configuration builder with settings specific to each implementing strategy
11+
* @return {@link C} the specific consumer flow control strategy configuration builder
12+
*/
13+
C builder(ConsumerBuilder consumerBuilder);
14+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.function.Supplier;
6+
7+
/**
8+
* The flow control strategy that was always applied before the flow control strategy mechanism existed in the codebase.
9+
* Requests a set amount of credits after each chunk arrives.
10+
*/
11+
public class LegacyFlowControlStrategy extends AbstractFlowControlStrategy {
12+
13+
private final int additionalCredits;
14+
15+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int additionalCredits) {
16+
super(clientSupplier);
17+
this.additionalCredits = additionalCredits;
18+
}
19+
20+
@Override
21+
public void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
22+
mandatoryClient().credit(subscriptionId, this.additionalCredits);
23+
}
24+
25+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.function.Supplier;
6+
7+
public class LegacyFlowControlStrategyBuilderFactory implements ConsumerFlowControlStrategyBuilderFactory<LegacyFlowControlStrategy, LegacyFlowControlStrategyBuilderFactory.LegacyFlowControlStrategyBuilder> {
8+
9+
public static final LegacyFlowControlStrategyBuilderFactory INSTANCE = new LegacyFlowControlStrategyBuilderFactory();
10+
11+
@Override
12+
public LegacyFlowControlStrategyBuilder builder(ConsumerBuilder consumerBuilder) {
13+
return new LegacyFlowControlStrategyBuilder(consumerBuilder);
14+
}
15+
16+
public static class LegacyFlowControlStrategyBuilder implements ConsumerFlowControlStrategyBuilder<LegacyFlowControlStrategy> {
17+
18+
private final ConsumerBuilder consumerBuilder;
19+
20+
private int additionalCredits = 1;
21+
22+
public LegacyFlowControlStrategyBuilder(ConsumerBuilder consumerBuilder) {
23+
this.consumerBuilder = consumerBuilder;
24+
}
25+
26+
@Override
27+
public ConsumerBuilder builder() {
28+
return this.consumerBuilder;
29+
}
30+
31+
public LegacyFlowControlStrategyBuilder additionalCredits(int additionalCredits) {
32+
this.additionalCredits = additionalCredits;
33+
return this;
34+
}
35+
36+
@Override
37+
public LegacyFlowControlStrategy build(Supplier<Client> clientSupplier) {
38+
return new LegacyFlowControlStrategy(clientSupplier, this.additionalCredits);
39+
}
40+
41+
}
42+
43+
}

0 commit comments

Comments
 (0)