Skip to content

Add Consumer Flow Control Strategies #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/main/java/com/rabbitmq/stream/AbstractFlowControlStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.rabbitmq.stream;

import com.rabbitmq.stream.impl.Client;

import java.util.Objects;
import java.util.function.Supplier;

public abstract class AbstractFlowControlStrategy implements ConsumerFlowControlStrategy {

private final Supplier<Client> clientSupplier;
private volatile Client client;

protected AbstractFlowControlStrategy(Supplier<Client> clientSupplier) {
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier");
}

protected Client mandatoryClient() {
Client localClient = this.client;
if(localClient != null) {
return localClient;
}
localClient = clientSupplier.get();
if(localClient == null) {
throw new IllegalStateException("Requested client, but client is not yet available! Supplier: " + this.clientSupplier);
}
this.client = localClient;
return localClient;
}

}
92 changes: 92 additions & 0 deletions src/main/java/com/rabbitmq/stream/ClientDataHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.rabbitmq.stream;

import com.rabbitmq.stream.impl.Client;

/**
* Exposes callbacks to handle events from a particular {@link Client},
* with specific names for methods and no {@link Client} parameter.
*/
public interface ClientDataHandler extends
Client.PublishConfirmListener,
Client.PublishErrorListener,
Client.ChunkListener,
Client.MessageListener,
Client.CreditNotification,
Client.ConsumerUpdateListener,
Client.ShutdownListener,
Client.MetadataListener {

@Override
default void handle(byte publisherId, long publishingId) {
this.handlePublishConfirm(publisherId, publishingId);
}

default void handlePublishConfirm(byte publisherId, long publishingId) {
// No-op by default
}

@Override
default void handle(byte publisherId, long publishingId, short errorCode) {
this.handlePublishError(publisherId, publishingId, errorCode);
}

default void handlePublishError(byte publisherId, long publishingId, short errorCode) {
// No-op by default
}

@Override
default void handle(Client client, byte subscriptionId, long offset, long messageCount, long dataSize) {
this.handleChunk(subscriptionId, offset, messageCount, dataSize);
}

default void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
// No-op by default
}

@Override
default void handle(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) {
this.handleMessage(subscriptionId, offset, chunkTimestamp, committedChunkId, message);
}

default void handleMessage(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) {
// No-op by default
}

@Override
default void handle(byte subscriptionId, short responseCode) {
this.handleCreditNotification(subscriptionId, responseCode);
}

default void handleCreditNotification(byte subscriptionId, short responseCode) {
// No-op by default
}

@Override
default OffsetSpecification handle(Client client, byte subscriptionId, boolean active) {
this.handleConsumerUpdate(subscriptionId, active);
return null;
}

default void handleConsumerUpdate(byte subscriptionId, boolean active) {
// No-op by default
}

@Override
default void handle(Client.ShutdownContext shutdownContext) {
this.handleShutdown(shutdownContext);
}

default void handleShutdown(Client.ShutdownContext shutdownContext) {
// No-op by default
}

@Override
default void handle(String stream, short code) {
this.handleMetadata(stream, code);
}

default void handleMetadata(String stream, short code) {
// No-op by default
}

}
22 changes: 13 additions & 9 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder messageHandler(MessageHandler messageHandler);

/**
* Factory for the flow control strategy to be used when consuming messages.
* @param consumerFlowControlStrategyBuilderFactory the factory
* @return a fluent configurable builder for the flow control strategy
* @param <T>
*/
<T extends ConsumerFlowControlStrategyBuilder<?>> T flowControlStrategy(ConsumerFlowControlStrategyBuilderFactory<?, T> consumerFlowControlStrategyBuilderFactory);

/**
* The logical name of the {@link Consumer}.
*
Expand Down Expand Up @@ -159,7 +167,7 @@ public interface ConsumerBuilder {
Consumer build();

/** Manual tracking strategy. */
interface ManualTrackingStrategy {
interface ManualTrackingStrategy extends ConsumerBuilderAccessor {

/**
* Interval to check if the last requested stored offset has been actually stored.
Expand All @@ -170,17 +178,10 @@ interface ManualTrackingStrategy {
* @return the manual tracking strategy
*/
ManualTrackingStrategy checkInterval(Duration checkInterval);

/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}

/** Auto-tracking strategy. */
interface AutoTrackingStrategy {
interface AutoTrackingStrategy extends ConsumerBuilderAccessor {

/**
* Number of messages before storing.
Expand All @@ -201,12 +202,15 @@ interface AutoTrackingStrategy {
* @return the auto-tracking strategy
*/
AutoTrackingStrategy flushInterval(Duration flushInterval);
}

interface ConsumerBuilderAccessor {
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}

}
49 changes: 49 additions & 0 deletions src/main/java/com/rabbitmq/stream/ConsumerFlowControlStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.rabbitmq.stream;

import com.rabbitmq.stream.impl.Client;

import java.util.Map;

/**
* A built and configured flow control strategy for consumers.
* Implementations may freely implement reactions to the various client callbacks.
* When defined by each implementation, it may internally call {@link Client#credit} to ask for credits.
*/
public interface ConsumerFlowControlStrategy extends ClientDataHandler, AutoCloseable {

/**
* Callback for handling a new stream subscription.
* Called right before the subscription is sent to the actual client.
*
* @param subscriptionId The subscriptionId as specified by the Stream Protocol
* @param stream The name of the stream being subscribed to
* @param offsetSpecification The offset specification for this new subscription
* @param subscriptionProperties The subscription properties for this new subscription
* @return The initial credits that should be granted to this new subscription
*/
int handleSubscribe(
byte subscriptionId,
String stream,
OffsetSpecification offsetSpecification,
Map<String, String> subscriptionProperties
);

/**
* Callback for handling a stream unsubscription.
* @param subscriptionId The subscriptionId as specified by the Stream Protocol
*/
default void handleUnsubscribe(byte subscriptionId) {
// No-op by default
}

@Override
default void handleShutdown(Client.ShutdownContext shutdownContext) {
this.close();
}

@Override
default void close() {
// Override with cleanup logic, if applicable
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.rabbitmq.stream;

import com.rabbitmq.stream.impl.Client;

import java.util.function.Supplier;

/**
* Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}.
*
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
*/
public interface ConsumerFlowControlStrategyBuilder<T extends ConsumerFlowControlStrategy> extends ConsumerBuilder.ConsumerBuilderAccessor {
/**
* Builds the actual FlowControlStrategy instance, for the Client with which it interoperates
*
* @param clientSupplier {@link Supplier <Client>} for retrieving the {@link Client}.
* Is not a {@link Client} instance because the {@link Client} may be lazily initialized.
* @return the FlowControlStrategy
*/
T build(Supplier<Client> clientSupplier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.rabbitmq.stream;

/**
* A strategy for regulating consumer flow when consuming from a RabbitMQ Stream.
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
* @param <C> the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder}
*/
public interface ConsumerFlowControlStrategyBuilderFactory<T extends ConsumerFlowControlStrategy, C extends ConsumerFlowControlStrategyBuilder<T>> {
/**
* Accessor for configuration builder with settings specific to each implementing strategy
* @return {@link C} the specific consumer flow control strategy configuration builder
*/
C builder(ConsumerBuilder consumerBuilder);
}
46 changes: 46 additions & 0 deletions src/main/java/com/rabbitmq/stream/LegacyFlowControlStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.rabbitmq.stream;

import com.rabbitmq.stream.impl.Client;

import java.util.Map;
import java.util.function.Supplier;

/**
* The flow control strategy that was always applied before the flow control strategy mechanism existed in the codebase.
* Requests a set amount of credits after each chunk arrives.
*/
public class LegacyFlowControlStrategy extends AbstractFlowControlStrategy {

private final int initialCredits;
private final int additionalCredits;

public LegacyFlowControlStrategy(Supplier<Client> clientSupplier) {
this(clientSupplier, 1);
}

public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits) {
this(clientSupplier, initialCredits, 1);
}

public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits, int additionalCredits) {
super(clientSupplier);
this.initialCredits = initialCredits;
this.additionalCredits = additionalCredits;
}

@Override
public int handleSubscribe(
byte subscriptionId,
String stream,
OffsetSpecification offsetSpecification,
Map<String, String> subscriptionProperties
) {
return this.initialCredits;
}

@Override
public void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
mandatoryClient().credit(subscriptionId, this.additionalCredits);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.rabbitmq.stream;

import com.rabbitmq.stream.impl.Client;

import java.util.function.Supplier;

public class LegacyFlowControlStrategyBuilderFactory implements ConsumerFlowControlStrategyBuilderFactory<LegacyFlowControlStrategy, LegacyFlowControlStrategyBuilderFactory.LegacyFlowControlStrategyBuilder> {

public static final LegacyFlowControlStrategyBuilderFactory INSTANCE = new LegacyFlowControlStrategyBuilderFactory();

@Override
public LegacyFlowControlStrategyBuilder builder(ConsumerBuilder consumerBuilder) {
return new LegacyFlowControlStrategyBuilder(consumerBuilder);
}

public static class LegacyFlowControlStrategyBuilder implements ConsumerFlowControlStrategyBuilder<LegacyFlowControlStrategy> {

private final ConsumerBuilder consumerBuilder;

private int initialCredits = 1;

private int additionalCredits = 1;

public LegacyFlowControlStrategyBuilder(ConsumerBuilder consumerBuilder) {
this.consumerBuilder = consumerBuilder;
}

@Override
public LegacyFlowControlStrategy build(Supplier<Client> clientSupplier) {
return new LegacyFlowControlStrategy(clientSupplier, this.initialCredits, this.additionalCredits);
}

@Override
public ConsumerBuilder builder() {
return this.consumerBuilder;
}

public LegacyFlowControlStrategyBuilder additionalCredits(int additionalCredits) {
this.additionalCredits = additionalCredits;
return this;
}

public LegacyFlowControlStrategyBuilder initialCredits(int initialCredits) {
this.initialCredits = initialCredits;
return this;
}
}

}
Loading