Skip to content

Commit 8af2e54

Browse files
committed
WIP, will force push later
1 parent 1ae1ded commit 8af2e54

11 files changed

+134
-66
lines changed

src/main/java/com/rabbitmq/stream/AbstractFlowControlStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
import java.util.Objects;
66
import java.util.function.Supplier;
77

8-
public class AbstractFlowControlStrategy implements ConsumerFlowControlStrategy {
8+
public abstract class AbstractFlowControlStrategy implements ConsumerFlowControlStrategy {
99

10-
private Supplier<Client> clientSupplier;
10+
private final Supplier<Client> clientSupplier;
1111
private volatile Client client;
1212

13-
public AbstractFlowControlStrategy(Supplier<Client> clientSupplier) {
13+
protected AbstractFlowControlStrategy(Supplier<Client> clientSupplier) {
1414
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier");
1515
}
1616

src/main/java/com/rabbitmq/stream/ClientDataHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import com.rabbitmq.stream.impl.Client;
44

55
/**
6-
* Exposes callbacks to handle events from a specific Client, with a leaner interface.
6+
* Exposes callbacks to handle events from a particular {@link Client},
7+
* with specific names for methods and no {@link Client} parameter.
78
*/
89
public interface ClientDataHandler extends
910
Client.PublishConfirmListener,

src/main/java/com/rabbitmq/stream/ConsumerFlowControlStrategy.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,41 @@
22

33
import com.rabbitmq.stream.impl.Client;
44

5+
import java.util.Map;
6+
57
/**
68
* A built and configured flow control strategy for consumers.
79
* Implementations may freely implement reactions to the various client callbacks.
810
* When defined by each implementation, it may internally call {@link Client#credit} to ask for credits.
911
*/
1012
public interface ConsumerFlowControlStrategy extends ClientDataHandler, AutoCloseable {
1113

14+
/**
15+
* Callback for handling a new stream subscription.
16+
* Called right before the subscription is sent to the actual client.
17+
*
18+
* @param subscriptionId The subscriptionId as specified by the Stream Protocol
19+
* @param stream The name of the stream being subscribed to
20+
* @param offsetSpecification The offset specification for this new subscription
21+
* @param subscriptionProperties The subscription properties for this new subscription
22+
* @return The initial credits that should be granted to this new subscription
23+
*/
24+
int handleSubscribe(
25+
byte subscriptionId,
26+
String stream,
27+
OffsetSpecification offsetSpecification,
28+
Map<String, String> subscriptionProperties
29+
);
30+
31+
/**
32+
* Callback for handling a stream unsubscription.
33+
* @param subscriptionId The subscriptionId as specified by the Stream Protocol
34+
*/
35+
default void handleUnsubscribe(byte subscriptionId) {
36+
// No-op by default
37+
}
38+
39+
@Override
1240
default void handleShutdown(Client.ShutdownContext shutdownContext) {
1341
this.close();
1442
}

src/main/java/com/rabbitmq/stream/LegacyFlowControlStrategy.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.rabbitmq.stream.impl.Client;
44

5+
import java.util.Map;
56
import java.util.function.Supplier;
67

78
/**
@@ -10,13 +11,33 @@
1011
*/
1112
public class LegacyFlowControlStrategy extends AbstractFlowControlStrategy {
1213

14+
private final int initialCredits;
1315
private final int additionalCredits;
1416

15-
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int additionalCredits) {
17+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier) {
18+
this(clientSupplier, 1);
19+
}
20+
21+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits) {
22+
this(clientSupplier, initialCredits, 1);
23+
}
24+
25+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits, int additionalCredits) {
1626
super(clientSupplier);
27+
this.initialCredits = initialCredits;
1728
this.additionalCredits = additionalCredits;
1829
}
1930

31+
@Override
32+
public int handleSubscribe(
33+
byte subscriptionId,
34+
String stream,
35+
OffsetSpecification offsetSpecification,
36+
Map<String, String> subscriptionProperties
37+
) {
38+
return this.initialCredits;
39+
}
40+
2041
@Override
2142
public void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
2243
mandatoryClient().credit(subscriptionId, this.additionalCredits);

src/main/java/com/rabbitmq/stream/LegacyFlowControlStrategyBuilderFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@ public static class LegacyFlowControlStrategyBuilder implements ConsumerFlowCont
1717

1818
private final ConsumerBuilder consumerBuilder;
1919

20+
private int initialCredits = 1;
21+
2022
private int additionalCredits = 1;
2123

2224
public LegacyFlowControlStrategyBuilder(ConsumerBuilder consumerBuilder) {
2325
this.consumerBuilder = consumerBuilder;
2426
}
2527

28+
@Override
29+
public LegacyFlowControlStrategy build(Supplier<Client> clientSupplier) {
30+
return new LegacyFlowControlStrategy(clientSupplier, this.initialCredits, this.additionalCredits);
31+
}
32+
2633
@Override
2734
public ConsumerBuilder builder() {
2835
return this.consumerBuilder;
@@ -33,11 +40,10 @@ public LegacyFlowControlStrategyBuilder additionalCredits(int additionalCredits)
3340
return this;
3441
}
3542

36-
@Override
37-
public LegacyFlowControlStrategy build(Supplier<Client> clientSupplier) {
38-
return new LegacyFlowControlStrategy(clientSupplier, this.additionalCredits);
43+
public LegacyFlowControlStrategyBuilder initialCredits(int initialCredits) {
44+
this.initialCredits = initialCredits;
45+
return this;
3946
}
40-
4147
}
4248

4349
}

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
543543
// the 2 data structures track the subscriptions, they must remain consistent
544544
private final Map<String, Set<SubscriptionTracker>> streamToStreamSubscriptions =
545545
new ConcurrentHashMap<>();
546+
private final ConsumerFlowControlStrategy consumerFlowControlStrategy;
546547
// trackers and tracker count must be kept in sync
547548
private volatile List<SubscriptionTracker> subscriptionTrackers =
548549
new ArrayList<>(maxConsumersByConnection);
@@ -561,9 +562,10 @@ private ClientSubscriptionsManager(
561562
IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null));
562563
this.trackerCount = 0;
563564
AtomicReference<Client> clientReference = new AtomicReference<>();
564-
ConsumerFlowControlStrategy consumerFlowControlStrategy = consumerFlowControlStrategyBuilder.build(clientReference::get);
565+
ConsumerFlowControlStrategy localConsumerFlowControlStrategy = consumerFlowControlStrategyBuilder.build(clientReference::get);
566+
this.consumerFlowControlStrategy = localConsumerFlowControlStrategy;
565567
ChunkListener chunkListener = (ignoredClient, subscriptionId, offset, messageCount, dataSize) ->
566-
consumerFlowControlStrategy.handleChunk(subscriptionId, offset, messageCount, dataSize);
568+
localConsumerFlowControlStrategy.handleChunk(subscriptionId, offset, messageCount, dataSize);
567569
CreditNotification creditNotification =
568570
(subscriptionId, responseCode) -> {
569571
SubscriptionTracker subscriptionTracker =
@@ -574,7 +576,7 @@ private ClientSubscriptionsManager(
574576
subscriptionId & 0xFF,
575577
stream,
576578
Utils.formatConstant(responseCode));
577-
consumerFlowControlStrategy.handleCreditNotification(subscriptionId, responseCode);
579+
localConsumerFlowControlStrategy.handleCreditNotification(subscriptionId, responseCode);
578580
};
579581

580582
MessageListener messageListener =
@@ -596,7 +598,7 @@ private ClientSubscriptionsManager(
596598
this.id,
597599
this.name);
598600
}
599-
consumerFlowControlStrategy.handleMessage(subscriptionId, offset, chunkTimestamp, committedOffset, message);
601+
localConsumerFlowControlStrategy.handleMessage(subscriptionId, offset, chunkTimestamp, committedOffset, message);
600602
};
601603
ShutdownListener shutdownListener =
602604
shutdownContext -> {
@@ -650,7 +652,7 @@ private ClientSubscriptionsManager(
650652
"Consumers re-assignment after disconnection from %s",
651653
name));
652654
}
653-
consumerFlowControlStrategy.handleShutdown(shutdownContext);
655+
localConsumerFlowControlStrategy.handleShutdown(shutdownContext);
654656
};
655657
MetadataListener metadataListener =
656658
(stream, code) -> {
@@ -702,7 +704,7 @@ private ClientSubscriptionsManager(
702704
"Consumers re-assignment after metadata update on stream '%s'",
703705
stream));
704706
}
705-
consumerFlowControlStrategy.handleMetadata(stream, code);
707+
localConsumerFlowControlStrategy.handleMetadata(stream, code);
706708
};
707709
ConsumerUpdateListener consumerUpdateListener =
708710
(client, subscriptionId, active) -> {
@@ -721,7 +723,7 @@ private ClientSubscriptionsManager(
721723
LOGGER.debug(
722724
"Could not find stream subscription {} for consumer update", subscriptionId);
723725
}
724-
consumerFlowControlStrategy.handleConsumerUpdate(subscriptionId, active);
726+
localConsumerFlowControlStrategy.handleConsumerUpdate(subscriptionId, active);
725727
return result;
726728
};
727729
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.CONSUMER);
@@ -954,14 +956,20 @@ synchronized void add(
954956

955957
checkNotClosed();
956958
byte subId = subscriptionId;
959+
int initialCredits = this.consumerFlowControlStrategy.handleSubscribe(
960+
subId,
961+
subscriptionTracker.stream,
962+
subscriptionContext.offsetSpecification(),
963+
subscriptionTracker.subscriptionProperties
964+
);
957965
Client.Response subscribeResponse =
958966
Utils.callAndMaybeRetry(
959967
() ->
960968
client.subscribe(
961969
subId,
962970
subscriptionTracker.stream,
963971
subscriptionContext.offsetSpecification(),
964-
subscriptionTracker.initialCredits,
972+
initialCredits,
965973
subscriptionTracker.subscriptionProperties),
966974
RETRY_ON_TIMEOUT,
967975
"Subscribe request for consumer %s on stream '%s'",
@@ -1025,7 +1033,7 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
10251033
subscriptionTracker.consumer.id(),
10261034
subscriptionTracker.stream);
10271035
}
1028-
1036+
this.consumerFlowControlStrategy.handleUnsubscribe(subscriptionIdInClient);
10291037
this.setSubscriptionTrackers(update(this.subscriptionTrackers, subscriptionIdInClient, null));
10301038
streamToStreamSubscriptions.compute(
10311039
subscriptionTracker.stream,
@@ -1090,6 +1098,7 @@ synchronized void close() {
10901098
if (this.client != null && this.client.isOpen() && tracker.consumer.isOpen()) {
10911099
this.client.unsubscribe(tracker.subscriptionIdInClient);
10921100
}
1101+
this.consumerFlowControlStrategy.handleUnsubscribe(tracker.subscriptionIdInClient);
10931102
} catch (Exception e) {
10941103
// OK, moving on
10951104
LOGGER.debug(

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,23 @@ public ConsumerBuilder noTrackingStrategy() {
135135
return this;
136136
}
137137

138-
public ConsumerBuilder credits(int initial, int onChunkDelivery) {
138+
/**
139+
*
140+
* @param initial Credits to ask for with each new subscription
141+
* @param onChunkDelivery Credits to ask for after a chunk is delivered
142+
* @return this {@link StreamConsumerBuilder}
143+
* @deprecated Prefer using {@link ConsumerBuilder#flowControlStrategy(ConsumerFlowControlStrategyBuilderFactory)}
144+
* to define flow control strategies instead.
145+
*/
146+
@Deprecated
147+
public StreamConsumerBuilder credits(int initial, int onChunkDelivery) {
139148
if (initial <= 0 || onChunkDelivery <= 0) {
140149
throw new IllegalArgumentException("Credits must be positive");
141150
}
142-
this.initialCredits = initial;
143-
this.additionalCredits = onChunkDelivery;
151+
this.consumerFlowControlStrategyBuilder = LegacyFlowControlStrategyBuilderFactory.INSTANCE
152+
.builder(this)
153+
.initialCredits(initial)
154+
.additionalCredits(additionalCredits);
144155
return this;
145156
}
146157

src/test/java/com/rabbitmq/stream/Host.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public static String capture(InputStream is) throws IOException {
4343

4444
private static Process executeCommand(String command) throws IOException {
4545
Process pr = executeCommandProcess(command);
46-
4746
int ev = waitForExitValue(pr);
4847
if (ev != 0) {
4948
String stdout = capture(pr.getInputStream());
@@ -83,10 +82,10 @@ private static Process executeCommandProcess(String command) throws IOException
8382
String[] finalCommand;
8483
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
8584
finalCommand = new String[4];
86-
finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
85+
finalCommand[0] = "C:\\Windows\\system32\\cmd.exe";
8786
finalCommand[1] = "/y";
8887
finalCommand[2] = "/c";
89-
finalCommand[3] = command;
88+
finalCommand[3] = command.replaceAll("\"", "\"\"\"").replaceAll("'", "\"");
9089
} else {
9190
finalCommand = new String[3];
9291
finalCommand[0] = "/bin/sh";

0 commit comments

Comments
 (0)