21
21
import static com .rabbitmq .stream .impl .Utils .namedRunnable ;
22
22
import static com .rabbitmq .stream .impl .Utils .quote ;
23
23
24
- import com .rabbitmq .stream .BackOffDelayPolicy ;
25
- import com .rabbitmq .stream .Constants ;
24
+ import com .rabbitmq .stream .*;
26
25
import com .rabbitmq .stream .Consumer ;
27
- import com .rabbitmq .stream .MessageHandler ;
28
26
import com .rabbitmq .stream .MessageHandler .Context ;
29
- import com .rabbitmq .stream .OffsetSpecification ;
30
- import com .rabbitmq .stream .StreamDoesNotExistException ;
31
- import com .rabbitmq .stream .StreamException ;
32
- import com .rabbitmq .stream .StreamNotAvailableException ;
33
- import com .rabbitmq .stream .SubscriptionListener ;
34
27
import com .rabbitmq .stream .SubscriptionListener .SubscriptionContext ;
35
28
import com .rabbitmq .stream .impl .Client .Broker ;
36
29
import com .rabbitmq .stream .impl .Client .ChunkListener ;
61
54
import java .util .concurrent .atomic .AtomicBoolean ;
62
55
import java .util .concurrent .atomic .AtomicLong ;
63
56
import java .util .concurrent .atomic .AtomicReference ;
64
- import java .util .function .Function ;
65
- import java .util .function .Predicate ;
57
+ import java .util .function .*;
66
58
import java .util .stream .Collectors ;
67
59
import java .util .stream .IntStream ;
68
60
import org .slf4j .Logger ;
@@ -122,8 +114,7 @@ Runnable subscribe(
122
114
Runnable trackingClosingCallback ,
123
115
MessageHandler messageHandler ,
124
116
Map <String , String > subscriptionProperties ,
125
- int initialCredits ,
126
- int additionalCredits ) {
117
+ ConsumerFlowStrategy flowStrategy ) {
127
118
List <Client .Broker > candidates = findBrokersForStream (stream );
128
119
Client .Broker newNode = pickBroker (candidates );
129
120
if (newNode == null ) {
@@ -143,8 +134,7 @@ Runnable subscribe(
143
134
trackingClosingCallback ,
144
135
messageHandler ,
145
136
subscriptionProperties ,
146
- initialCredits ,
147
- additionalCredits );
137
+ flowStrategy );
148
138
149
139
try {
150
140
addToManager (newNode , subscriptionTracker , offsetSpecification , true );
@@ -403,8 +393,7 @@ private static class SubscriptionTracker {
403
393
private volatile ClientSubscriptionsManager manager ;
404
394
private volatile AtomicReference <SubscriptionState > state =
405
395
new AtomicReference <>(SubscriptionState .OPENING );
406
- private final int initialCredits ;
407
- private final int additionalCredits ;
396
+ private final ConsumerFlowStrategy flowStrategy ;
408
397
409
398
private SubscriptionTracker (
410
399
long id ,
@@ -416,8 +405,7 @@ private SubscriptionTracker(
416
405
Runnable trackingClosingCallback ,
417
406
MessageHandler messageHandler ,
418
407
Map <String , String > subscriptionProperties ,
419
- int initialCredits ,
420
- int additionalCredits ) {
408
+ ConsumerFlowStrategy flowStrategy ) {
421
409
this .id = id ;
422
410
this .consumer = consumer ;
423
411
this .stream = stream ;
@@ -426,8 +414,7 @@ private SubscriptionTracker(
426
414
this .subscriptionListener = subscriptionListener ;
427
415
this .trackingClosingCallback = trackingClosingCallback ;
428
416
this .messageHandler = messageHandler ;
429
- this .initialCredits = initialCredits ;
430
- this .additionalCredits = additionalCredits ;
417
+ this .flowStrategy = flowStrategy ;
431
418
if (this .offsetTrackingReference == null ) {
432
419
this .subscriptionProperties = subscriptionProperties ;
433
420
} else {
@@ -497,13 +484,19 @@ private static final class MessageHandlerContext implements Context {
497
484
private final long timestamp ;
498
485
private final long committedOffset ;
499
486
private final StreamConsumer consumer ;
487
+ private final LongConsumer processedCallback ;
500
488
501
489
private MessageHandlerContext (
502
- long offset , long timestamp , long committedOffset , StreamConsumer consumer ) {
490
+ long offset ,
491
+ long timestamp ,
492
+ long committedOffset ,
493
+ StreamConsumer consumer ,
494
+ LongConsumer processedCallback ) {
503
495
this .offset = offset ;
504
496
this .timestamp = timestamp ;
505
497
this .committedOffset = committedOffset ;
506
498
this .consumer = consumer ;
499
+ this .processedCallback = processedCallback ;
507
500
}
508
501
509
502
@ Override
@@ -534,6 +527,11 @@ public String stream() {
534
527
public Consumer consumer () {
535
528
return this .consumer ;
536
529
}
530
+
531
+ @ Override
532
+ public void processed () {
533
+ this .processedCallback .accept (this .offset );
534
+ }
537
535
}
538
536
539
537
/**
@@ -569,13 +567,28 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
569
567
(client , subscriptionId , offset , messageCount , dataSize ) -> {
570
568
SubscriptionTracker subscriptionTracker =
571
569
subscriptionTrackers .get (subscriptionId & 0xFF );
570
+ LongConsumer processCallback ;
572
571
if (subscriptionTracker != null && subscriptionTracker .consumer .isOpen ()) {
573
- client .credit (subscriptionId , subscriptionTracker .additionalCredits );
572
+ ConsumerFlowStrategy .Context chunkContext =
573
+ new ConsumerFlowStrategy .Context () {
574
+ @ Override
575
+ public void credits (int credits ) {
576
+ client .credit (subscriptionId , 1 );
577
+ }
578
+
579
+ @ Override
580
+ public long messageCount () {
581
+ return messageCount ;
582
+ }
583
+ };
584
+ processCallback = subscriptionTracker .flowStrategy .start (chunkContext );
574
585
} else {
575
586
LOGGER .debug (
576
587
"Could not find stream subscription {} or subscription closing, not providing credits" ,
577
588
subscriptionId & 0xFF );
589
+ processCallback = null ;
578
590
}
591
+ return processCallback ;
579
592
};
580
593
581
594
CreditNotification creditNotification =
@@ -591,17 +604,20 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
591
604
};
592
605
593
606
MessageListener messageListener =
594
- (subscriptionId , offset , chunkTimestamp , committedOffset , message ) -> {
607
+ (subscriptionId , offset , chunkTimestamp , committedOffset , chunkContext , message ) -> {
595
608
SubscriptionTracker subscriptionTracker =
596
609
subscriptionTrackers .get (subscriptionId & 0xFF );
597
610
if (subscriptionTracker != null ) {
598
611
subscriptionTracker .offset = offset ;
599
612
subscriptionTracker .hasReceivedSomething = true ;
600
613
subscriptionTracker .messageHandler .handle (
601
614
new MessageHandlerContext (
602
- offset , chunkTimestamp , committedOffset , subscriptionTracker .consumer ),
615
+ offset ,
616
+ chunkTimestamp ,
617
+ committedOffset ,
618
+ subscriptionTracker .consumer ,
619
+ (LongConsumer ) chunkContext ),
603
620
message );
604
- // FIXME set offset here as well, best effort to avoid duplicates?
605
621
} else {
606
622
LOGGER .debug (
607
623
"Could not find stream subscription {} in manager {}, node {}" ,
@@ -969,7 +985,7 @@ synchronized void add(
969
985
subId ,
970
986
subscriptionTracker .stream ,
971
987
subscriptionContext .offsetSpecification (),
972
- subscriptionTracker .initialCredits ,
988
+ subscriptionTracker .flowStrategy . initialCredits () ,
973
989
subscriptionTracker .subscriptionProperties ),
974
990
RETRY_ON_TIMEOUT ,
975
991
"Subscribe request for consumer %s on stream '%s'" ,
0 commit comments