-
Notifications
You must be signed in to change notification settings - Fork 16
Add Consumer Flow Control Strategies #340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 2 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
7add606
Add flow control strategy mechanism
henry701 2fbd85f
Add test for flow control strategy main handler methods
henry701 7175c3d
Abstract Client instance away from flow control strategy, create Cons…
henry701 9a08c72
Abstract dependency on Client away from public Flow Control classes b…
henry701 704e7b0
Remove obsolete 'initialCredits' and 'additionalCredits' parameters
henry701 f94be95
Fix tests by fixing control flow callback adapter delegation
henry701 b4a0b44
Add integrated test and fix issues discovered in the initial flow con…
henry701 13102e7
Fix consumer flow on reconnection situations and add integrated test …
henry701 a54100b
Remove redundant 0 assignments to trackerCount field
henry701 7250c8a
Change lifecycle of each consumer flow control strategy instance to b…
henry701 70486f4
Change MessageHandler.Context.markHandled to receive no parameters, a…
henry701 a156eb0
Remove MessageHandlingListenerConsumerBuilderAccessor
henry701 09f3cb7
Change lifecycle of ConsumerFlowControlStrategy to be 1-1 with Subscr…
henry701 22060e9
Merge branch 'main' into consumer-flow-control
henry701 17835b0
Add specific error for when client is not initialized
henry701 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
30 changes: 30 additions & 0 deletions
30
src/main/java/com/rabbitmq/stream/AbstractFlowControlStrategy.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package com.rabbitmq.stream; | ||
|
||
import com.rabbitmq.stream.impl.Client; | ||
|
||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
|
||
public abstract class AbstractFlowControlStrategy implements ConsumerFlowControlStrategy { | ||
|
||
private final Supplier<Client> clientSupplier; | ||
private volatile Client client; | ||
|
||
protected AbstractFlowControlStrategy(Supplier<Client> clientSupplier) { | ||
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier"); | ||
} | ||
|
||
protected Client mandatoryClient() { | ||
Client localClient = this.client; | ||
if(localClient != null) { | ||
return localClient; | ||
} | ||
localClient = clientSupplier.get(); | ||
if(localClient == null) { | ||
throw new IllegalStateException("Requested client, but client is not yet available! Supplier: " + this.clientSupplier); | ||
} | ||
this.client = localClient; | ||
return localClient; | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package com.rabbitmq.stream; | ||
|
||
import com.rabbitmq.stream.impl.Client; | ||
|
||
/** | ||
* Exposes callbacks to handle events from a particular {@link Client}, | ||
* with specific names for methods and no {@link Client} parameter. | ||
*/ | ||
public interface ClientDataHandler extends | ||
Client.PublishConfirmListener, | ||
Client.PublishErrorListener, | ||
Client.ChunkListener, | ||
Client.MessageListener, | ||
Client.CreditNotification, | ||
Client.ConsumerUpdateListener, | ||
Client.ShutdownListener, | ||
Client.MetadataListener { | ||
|
||
@Override | ||
default void handle(byte publisherId, long publishingId) { | ||
this.handlePublishConfirm(publisherId, publishingId); | ||
} | ||
|
||
default void handlePublishConfirm(byte publisherId, long publishingId) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handle(byte publisherId, long publishingId, short errorCode) { | ||
this.handlePublishError(publisherId, publishingId, errorCode); | ||
} | ||
|
||
default void handlePublishError(byte publisherId, long publishingId, short errorCode) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handle(Client client, byte subscriptionId, long offset, long messageCount, long dataSize) { | ||
this.handleChunk(subscriptionId, offset, messageCount, dataSize); | ||
} | ||
|
||
default void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handle(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) { | ||
this.handleMessage(subscriptionId, offset, chunkTimestamp, committedChunkId, message); | ||
} | ||
|
||
default void handleMessage(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handle(byte subscriptionId, short responseCode) { | ||
this.handleCreditNotification(subscriptionId, responseCode); | ||
} | ||
|
||
default void handleCreditNotification(byte subscriptionId, short responseCode) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default OffsetSpecification handle(Client client, byte subscriptionId, boolean active) { | ||
this.handleConsumerUpdate(subscriptionId, active); | ||
return null; | ||
} | ||
|
||
default void handleConsumerUpdate(byte subscriptionId, boolean active) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handle(Client.ShutdownContext shutdownContext) { | ||
this.handleShutdown(shutdownContext); | ||
} | ||
|
||
default void handleShutdown(Client.ShutdownContext shutdownContext) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handle(String stream, short code) { | ||
this.handleMetadata(stream, code); | ||
} | ||
|
||
default void handleMetadata(String stream, short code) { | ||
// No-op by default | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
49 changes: 49 additions & 0 deletions
49
src/main/java/com/rabbitmq/stream/ConsumerFlowControlStrategy.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package com.rabbitmq.stream; | ||
|
||
import com.rabbitmq.stream.impl.Client; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* A built and configured flow control strategy for consumers. | ||
* Implementations may freely implement reactions to the various client callbacks. | ||
* When defined by each implementation, it may internally call {@link Client#credit} to ask for credits. | ||
*/ | ||
public interface ConsumerFlowControlStrategy extends ClientDataHandler, AutoCloseable { | ||
|
||
/** | ||
* Callback for handling a new stream subscription. | ||
* Called right before the subscription is sent to the actual client. | ||
* | ||
* @param subscriptionId The subscriptionId as specified by the Stream Protocol | ||
* @param stream The name of the stream being subscribed to | ||
* @param offsetSpecification The offset specification for this new subscription | ||
* @param subscriptionProperties The subscription properties for this new subscription | ||
* @return The initial credits that should be granted to this new subscription | ||
*/ | ||
int handleSubscribe( | ||
byte subscriptionId, | ||
String stream, | ||
OffsetSpecification offsetSpecification, | ||
Map<String, String> subscriptionProperties | ||
); | ||
|
||
/** | ||
* Callback for handling a stream unsubscription. | ||
* @param subscriptionId The subscriptionId as specified by the Stream Protocol | ||
*/ | ||
default void handleUnsubscribe(byte subscriptionId) { | ||
// No-op by default | ||
} | ||
|
||
@Override | ||
default void handleShutdown(Client.ShutdownContext shutdownContext) { | ||
this.close(); | ||
} | ||
|
||
@Override | ||
default void close() { | ||
// Override with cleanup logic, if applicable | ||
} | ||
|
||
} |
21 changes: 21 additions & 0 deletions
21
src/main/java/com/rabbitmq/stream/ConsumerFlowControlStrategyBuilder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package com.rabbitmq.stream; | ||
|
||
import com.rabbitmq.stream.impl.Client; | ||
|
||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}. | ||
* | ||
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built | ||
*/ | ||
public interface ConsumerFlowControlStrategyBuilder<T extends ConsumerFlowControlStrategy> extends ConsumerBuilder.ConsumerBuilderAccessor { | ||
/** | ||
* Builds the actual FlowControlStrategy instance, for the Client with which it interoperates | ||
* | ||
* @param clientSupplier {@link Supplier <Client>} for retrieving the {@link Client}. | ||
* Is not a {@link Client} instance because the {@link Client} may be lazily initialized. | ||
* @return the FlowControlStrategy | ||
*/ | ||
T build(Supplier<Client> clientSupplier); | ||
} |
14 changes: 14 additions & 0 deletions
14
src/main/java/com/rabbitmq/stream/ConsumerFlowControlStrategyBuilderFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.rabbitmq.stream; | ||
|
||
/** | ||
* A strategy for regulating consumer flow when consuming from a RabbitMQ Stream. | ||
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built | ||
* @param <C> the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder} | ||
*/ | ||
public interface ConsumerFlowControlStrategyBuilderFactory<T extends ConsumerFlowControlStrategy, C extends ConsumerFlowControlStrategyBuilder<T>> { | ||
/** | ||
* Accessor for configuration builder with settings specific to each implementing strategy | ||
* @return {@link C} the specific consumer flow control strategy configuration builder | ||
*/ | ||
C builder(ConsumerBuilder consumerBuilder); | ||
} |
46 changes: 46 additions & 0 deletions
46
src/main/java/com/rabbitmq/stream/LegacyFlowControlStrategy.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package com.rabbitmq.stream; | ||
|
||
import com.rabbitmq.stream.impl.Client; | ||
|
||
import java.util.Map; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* The flow control strategy that was always applied before the flow control strategy mechanism existed in the codebase. | ||
* Requests a set amount of credits after each chunk arrives. | ||
*/ | ||
public class LegacyFlowControlStrategy extends AbstractFlowControlStrategy { | ||
|
||
private final int initialCredits; | ||
private final int additionalCredits; | ||
|
||
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier) { | ||
this(clientSupplier, 1); | ||
} | ||
|
||
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits) { | ||
this(clientSupplier, initialCredits, 1); | ||
} | ||
|
||
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits, int additionalCredits) { | ||
super(clientSupplier); | ||
this.initialCredits = initialCredits; | ||
this.additionalCredits = additionalCredits; | ||
} | ||
|
||
@Override | ||
public int handleSubscribe( | ||
byte subscriptionId, | ||
String stream, | ||
OffsetSpecification offsetSpecification, | ||
Map<String, String> subscriptionProperties | ||
) { | ||
return this.initialCredits; | ||
} | ||
|
||
@Override | ||
public void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) { | ||
mandatoryClient().credit(subscriptionId, this.additionalCredits); | ||
} | ||
|
||
} |
49 changes: 49 additions & 0 deletions
49
src/main/java/com/rabbitmq/stream/LegacyFlowControlStrategyBuilderFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package com.rabbitmq.stream; | ||
|
||
import com.rabbitmq.stream.impl.Client; | ||
|
||
import java.util.function.Supplier; | ||
|
||
public class LegacyFlowControlStrategyBuilderFactory implements ConsumerFlowControlStrategyBuilderFactory<LegacyFlowControlStrategy, LegacyFlowControlStrategyBuilderFactory.LegacyFlowControlStrategyBuilder> { | ||
|
||
public static final LegacyFlowControlStrategyBuilderFactory INSTANCE = new LegacyFlowControlStrategyBuilderFactory(); | ||
|
||
@Override | ||
public LegacyFlowControlStrategyBuilder builder(ConsumerBuilder consumerBuilder) { | ||
return new LegacyFlowControlStrategyBuilder(consumerBuilder); | ||
} | ||
|
||
public static class LegacyFlowControlStrategyBuilder implements ConsumerFlowControlStrategyBuilder<LegacyFlowControlStrategy> { | ||
|
||
private final ConsumerBuilder consumerBuilder; | ||
|
||
private int initialCredits = 1; | ||
|
||
private int additionalCredits = 1; | ||
|
||
public LegacyFlowControlStrategyBuilder(ConsumerBuilder consumerBuilder) { | ||
this.consumerBuilder = consumerBuilder; | ||
} | ||
|
||
@Override | ||
public LegacyFlowControlStrategy build(Supplier<Client> clientSupplier) { | ||
return new LegacyFlowControlStrategy(clientSupplier, this.initialCredits, this.additionalCredits); | ||
} | ||
|
||
@Override | ||
public ConsumerBuilder builder() { | ||
return this.consumerBuilder; | ||
} | ||
|
||
public LegacyFlowControlStrategyBuilder additionalCredits(int additionalCredits) { | ||
this.additionalCredits = additionalCredits; | ||
return this; | ||
} | ||
|
||
public LegacyFlowControlStrategyBuilder initialCredits(int initialCredits) { | ||
this.initialCredits = initialCredits; | ||
return this; | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.