Skip to content

Commit 0d98186

Browse files
committed
Add Client.MessageIgnoredListener
To deal with ignored messages at the beginning of the first chunk of a subscription. This is used with consumer flow strategy.
1 parent 074c565 commit 0d98186

File tree

6 files changed

+163
-41
lines changed

6 files changed

+163
-41
lines changed

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,9 @@
383383
<excludes>
384384
<exclude>**/*TestSuite.java</exclude>
385385
</excludes>
386+
<systemProperties>
387+
<rabbitmqctl.bin>DOCKER:rabbitmq</rabbitmqctl.bin>
388+
</systemProperties>
386389
</configuration>
387390
</plugin>
388391

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ public class Client implements AutoCloseable {
172172
final PublishErrorListener publishErrorListener;
173173
final ChunkListener chunkListener;
174174
final MessageListener messageListener;
175+
final MessageIgnoredListener messageIgnoredListener;
175176
final CreditNotification creditNotification;
176177
final ConsumerUpdateListener consumerUpdateListener;
177178
final MetadataListener metadataListener;
@@ -226,6 +227,7 @@ public Client(ClientParameters parameters) {
226227
this.publishErrorListener = parameters.publishErrorListener;
227228
this.chunkListener = parameters.chunkListener;
228229
this.messageListener = parameters.messageListener;
230+
this.messageIgnoredListener = parameters.messageIgnoredListener;
229231
this.creditNotification = parameters.creditNotification;
230232
this.codec = parameters.codec == null ? Codecs.DEFAULT : parameters.codec;
231233
this.saslConfiguration = parameters.saslConfiguration;
@@ -1656,6 +1658,7 @@ public interface ChunkListener {
16561658
* @param offset the first offset in the chunk
16571659
* @param messageCount the total number of messages in the chunk
16581660
* @param dataSize the size in bytes of the data in the chunk
1661+
* @return a "chunk context" instance that'll be passed in to the {@link MessageListener}
16591662
*/
16601663
Object handle(
16611664
Client client, byte subscriptionId, long offset, long messageCount, long dataSize);
@@ -1672,6 +1675,16 @@ void handle(
16721675
Message message);
16731676
}
16741677

1678+
public interface MessageIgnoredListener {
1679+
1680+
void ignored(
1681+
byte subscriptionId,
1682+
long offset,
1683+
long chunkTimestamp,
1684+
long committedChunkId,
1685+
Object chunkContext);
1686+
}
1687+
16751688
public interface CreditNotification {
16761689

16771690
void handle(byte subscriptionId, short responseCode);
@@ -2214,6 +2227,8 @@ public static class ClientParameters {
22142227
(client, correlationId, offset, messageCount, dataSize) -> null;
22152228
private MessageListener messageListener =
22162229
(correlationId, offset, chunkTimestamp, committedOffset, chunkContext, message) -> {};
2230+
private MessageIgnoredListener messageIgnoredListener =
2231+
(subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext) -> {};
22172232
private MetadataListener metadataListener = (stream, code) -> {};
22182233
private CreditNotification creditNotification =
22192234
(subscriptionId, responseCode) ->
@@ -2270,6 +2285,11 @@ public ClientParameters messageListener(MessageListener messageListener) {
22702285
return this;
22712286
}
22722287

2288+
public ClientParameters messageIgnoredListener(MessageIgnoredListener messageIgnoredListener) {
2289+
this.messageIgnoredListener = messageIgnoredListener;
2290+
return this;
2291+
}
2292+
22732293
public ClientParameters creditNotification(CreditNotification creditNotification) {
22742294
this.creditNotification = creditNotification;
22752295
return this;

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,8 @@
2525
import com.rabbitmq.stream.Consumer;
2626
import com.rabbitmq.stream.MessageHandler.Context;
2727
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
28-
import com.rabbitmq.stream.impl.Client.Broker;
29-
import com.rabbitmq.stream.impl.Client.ChunkListener;
30-
import com.rabbitmq.stream.impl.Client.ClientParameters;
28+
import com.rabbitmq.stream.impl.Client.*;
3129
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
32-
import com.rabbitmq.stream.impl.Client.CreditNotification;
33-
import com.rabbitmq.stream.impl.Client.MessageListener;
34-
import com.rabbitmq.stream.impl.Client.MetadataListener;
35-
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
36-
import com.rabbitmq.stream.impl.Client.ShutdownListener;
3730
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
3831
import com.rabbitmq.stream.impl.Utils.ClientFactory;
3932
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
@@ -604,7 +597,7 @@ public long messageCount() {
604597
};
605598

606599
MessageListener messageListener =
607-
(subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext, message) -> {
600+
(subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext, message) -> {
608601
SubscriptionTracker subscriptionTracker =
609602
subscriptionTrackers.get(subscriptionId & 0xFF);
610603
if (subscriptionTracker != null) {
@@ -614,13 +607,37 @@ public long messageCount() {
614607
new MessageHandlerContext(
615608
offset,
616609
chunkTimestamp,
617-
committedOffset,
610+
committedChunkId,
618611
subscriptionTracker.consumer,
619612
(ConsumerFlowStrategy.MessageProcessedCallback) chunkContext),
620613
message);
621614
} else {
622615
LOGGER.debug(
623-
"Could not find stream subscription {} in manager {}, node {}",
616+
"Could not find stream subscription {} in manager {}, node {} for message listener",
617+
subscriptionId,
618+
this.id,
619+
this.name);
620+
}
621+
};
622+
MessageIgnoredListener messageIgnoredListener =
623+
(subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext) -> {
624+
SubscriptionTracker subscriptionTracker =
625+
subscriptionTrackers.get(subscriptionId & 0xFF);
626+
if (subscriptionTracker != null) {
627+
// message at the beginning of the first chunk is ignored
628+
// we "simulate" the processing
629+
MessageHandlerContext messageHandlerContext =
630+
new MessageHandlerContext(
631+
offset,
632+
chunkTimestamp,
633+
committedChunkId,
634+
subscriptionTracker.consumer,
635+
(ConsumerFlowStrategy.MessageProcessedCallback) chunkContext);
636+
((ConsumerFlowStrategy.MessageProcessedCallback) chunkContext)
637+
.processed(messageHandlerContext);
638+
} else {
639+
LOGGER.debug(
640+
"Could not find stream subscription {} in manager {}, node {} for message ignored listener",
624641
subscriptionId,
625642
this.id,
626643
this.name);
@@ -757,6 +774,7 @@ public long messageCount() {
757774
.chunkListener(chunkListener)
758775
.creditNotification(creditNotification)
759776
.messageListener(messageListener)
777+
.messageIgnoredListener(messageIgnoredListener)
760778
.shutdownListener(shutdownListener)
761779
.metadataListener(metadataListener)
762780
.consumerUpdateListener(consumerUpdateListener))

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,8 @@
5555
import com.rabbitmq.stream.StreamException;
5656
import com.rabbitmq.stream.compression.Compression;
5757
import com.rabbitmq.stream.compression.CompressionCodec;
58-
import com.rabbitmq.stream.impl.Client.Broker;
59-
import com.rabbitmq.stream.impl.Client.ChunkListener;
60-
import com.rabbitmq.stream.impl.Client.MessageListener;
61-
import com.rabbitmq.stream.impl.Client.OpenResponse;
62-
import com.rabbitmq.stream.impl.Client.OutstandingRequest;
63-
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
64-
import com.rabbitmq.stream.impl.Client.QueryPublisherSequenceResponse;
65-
import com.rabbitmq.stream.impl.Client.Response;
66-
import com.rabbitmq.stream.impl.Client.SaslAuthenticateResponse;
58+
import com.rabbitmq.stream.impl.Client.*;
6759
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
68-
import com.rabbitmq.stream.impl.Client.StreamMetadata;
69-
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
70-
import com.rabbitmq.stream.impl.Client.SubscriptionOffset;
7160
import com.rabbitmq.stream.impl.Utils.MutableBoolean;
7261
import com.rabbitmq.stream.metrics.MetricsCollector;
7362
import io.netty.buffer.ByteBuf;
@@ -344,8 +333,8 @@ public boolean isInitiatedByServer() {
344333
static int handleMessage(
345334
ByteBuf bb,
346335
int read,
347-
boolean filter,
348-
MutableBoolean messageFiltered,
336+
boolean ignore,
337+
MutableBoolean messageIgnored,
349338
long offset,
350339
long offsetLimit,
351340
long chunkTimestamp,
@@ -360,8 +349,8 @@ static int handleMessage(
360349
bb.readBytes(data);
361350
read += entrySize;
362351

363-
if (filter && Long.compareUnsigned(offset, offsetLimit) < 0) {
364-
messageFiltered.set(true);
352+
if (ignore && Long.compareUnsigned(offset, offsetLimit) < 0) {
353+
messageIgnored.set(true);
365354
} else {
366355
Message message = codec.decode(data);
367356
messageListener.handle(
@@ -375,6 +364,7 @@ static int handleDeliverVersion1(
375364
Client client,
376365
ChunkListener chunkListener,
377366
MessageListener messageListener,
367+
MessageIgnoredListener messageIgnoredListener,
378368
Codec codec,
379369
List<SubscriptionOffset> subscriptionOffsets,
380370
ChunkChecksum chunkChecksum,
@@ -384,6 +374,7 @@ static int handleDeliverVersion1(
384374
client,
385375
chunkListener,
386376
messageListener,
377+
messageIgnoredListener,
387378
codec,
388379
subscriptionOffsets,
389380
chunkChecksum,
@@ -399,6 +390,7 @@ static int handleDeliver(
399390
Client client,
400391
ChunkListener chunkListener,
401392
MessageListener messageListener,
393+
MessageIgnoredListener messageIgnoredListener,
402394
Codec codec,
403395
List<SubscriptionOffset> subscriptionOffsets,
404396
ChunkChecksum chunkChecksum,
@@ -465,7 +457,7 @@ static int handleDeliver(
465457
}
466458
}
467459

468-
final boolean filter = offsetLimit != -1;
460+
final boolean ignore = offsetLimit != -1;
469461

470462
try {
471463
chunkChecksum.checksum(message, dataLength, crc);
@@ -480,7 +472,7 @@ static int handleDeliver(
480472

481473
metricsCollector.chunk(numEntries);
482474
long messagesRead = 0;
483-
MutableBoolean messageFiltered = new MutableBoolean(false);
475+
MutableBoolean messageIgnored = new MutableBoolean(false);
484476

485477
while (numRecords != 0) {
486478
byte entryType = message.readByte();
@@ -495,8 +487,8 @@ static int handleDeliver(
495487
handleMessage(
496488
message,
497489
read,
498-
filter,
499-
messageFiltered,
490+
ignore,
491+
messageIgnored,
500492
offset,
501493
offsetLimit,
502494
chunkTimestamp,
@@ -505,10 +497,10 @@ static int handleDeliver(
505497
messageListener,
506498
subscriptionId,
507499
chunkContext);
508-
if (messageFiltered.get()) {
509-
// TODO add listener and pass chunk context in
510-
// this will be used e.g. to mark messages as "processed"
511-
messageFiltered.set(false);
500+
if (messageIgnored.get()) {
501+
messageIgnoredListener.ignored(
502+
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
503+
messageIgnored.set(false);
512504
} else {
513505
messagesRead++;
514506
}
@@ -564,8 +556,8 @@ static int handleDeliver(
564556
handleMessage(
565557
bbToReadFrom,
566558
read,
567-
filter,
568-
messageFiltered,
559+
ignore,
560+
messageIgnored,
569561
offset,
570562
offsetLimit,
571563
chunkTimestamp,
@@ -574,8 +566,10 @@ static int handleDeliver(
574566
messageListener,
575567
subscriptionId,
576568
chunkContext);
577-
if (messageFiltered.get()) {
578-
messageFiltered.set(false);
569+
if (messageIgnored.get()) {
570+
messageIgnoredListener.ignored(
571+
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
572+
messageIgnored.set(false);
579573
} else {
580574
messagesRead++;
581575
}
@@ -601,6 +595,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
601595
client,
602596
client.chunkListener,
603597
client.messageListener,
598+
client.messageIgnoredListener,
604599
client.codec,
605600
client.subscriptionOffsets,
606601
client.chunkChecksum,
@@ -622,6 +617,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
622617
client,
623618
client.chunkListener,
624619
client.messageListener,
620+
client.messageIgnoredListener,
625621
client.codec,
626622
client.subscriptionOffsets,
627623
client.chunkChecksum,

0 commit comments

Comments
 (0)