25
25
import java .util .concurrent .ConcurrentHashMap ;
26
26
import java .util .concurrent .atomic .AtomicInteger ;
27
27
import java .util .concurrent .atomic .AtomicLong ;
28
+ import java .util .function .BiConsumer ;
28
29
import java .util .stream .Stream ;
29
30
import org .junit .jupiter .api .AfterEach ;
30
31
import org .junit .jupiter .api .BeforeEach ;
@@ -119,24 +120,24 @@ void autoTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Ex
119
120
void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff () throws Exception {
120
121
int messageCount = 10000 ;
121
122
int storeEvery = 1000 ;
122
- Map <Integer , AtomicInteger > receivedMessages = new ConcurrentHashMap <>();
123
- receivedMessages .put (0 , new AtomicInteger (0 ));
124
- receivedMessages .put (1 , new AtomicInteger (0 ));
123
+ AtomicInteger consumer1MessageCount = new AtomicInteger (0 );
124
+ AtomicInteger consumer2MessageCount = new AtomicInteger (0 );
125
125
AtomicLong lastReceivedOffset = new AtomicLong (0 );
126
126
String consumerName = "foo" ;
127
127
128
+ BiConsumer <MessageHandler .Context , AtomicInteger > handler =
129
+ (ctx , count ) -> {
130
+ lastReceivedOffset .set (ctx .offset ());
131
+ if (count .incrementAndGet () % storeEvery == 0 ) {
132
+ ctx .storeOffset ();
133
+ }
134
+ };
135
+
128
136
Consumer consumer1 =
129
137
environment .consumerBuilder ().stream (stream )
130
138
.name (consumerName )
131
139
.singleActiveConsumer ()
132
- .messageHandler (
133
- (context , message ) -> {
134
- lastReceivedOffset .set (context .offset ());
135
- int count = receivedMessages .get (0 ).incrementAndGet ();
136
- if (count % storeEvery == 0 ) {
137
- context .storeOffset ();
138
- }
139
- })
140
+ .messageHandler ((context , message ) -> handler .accept (context , consumer1MessageCount ))
140
141
.offset (OffsetSpecification .first ())
141
142
.manualTrackingStrategy ()
142
143
.builder ()
@@ -146,24 +147,17 @@ void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws
146
147
environment .consumerBuilder ().stream (stream )
147
148
.name (consumerName )
148
149
.singleActiveConsumer ()
149
- .messageHandler (
150
- (context , message ) -> {
151
- lastReceivedOffset .set (context .offset ());
152
- int count = receivedMessages .get (1 ).incrementAndGet ();
153
- if (count % storeEvery == 0 ) {
154
- context .storeOffset ();
155
- }
156
- })
150
+ .messageHandler ((context , message ) -> handler .accept (context , consumer2MessageCount ))
157
151
.offset (OffsetSpecification .first ())
158
152
.manualTrackingStrategy ()
159
153
.builder ()
160
154
.build ();
161
155
162
156
publishAndWaitForConfirms (cf , messageCount , stream );
163
- waitAtMost (() -> receivedMessages . getOrDefault ( 0 , new AtomicInteger ( 0 )) .get () == messageCount );
157
+ waitAtMost (() -> consumer1MessageCount .get () == messageCount );
164
158
165
159
assertThat (lastReceivedOffset ).hasPositiveValue ();
166
- assertThat (receivedMessages . get ( 1 ) ).hasValue (0 );
160
+ assertThat (consumer2MessageCount ).hasValue (0 );
167
161
168
162
long firstWaveLimit = lastReceivedOffset .get ();
169
163
@@ -174,9 +168,9 @@ void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws
174
168
175
169
publishAndWaitForConfirms (cf , messageCount , stream );
176
170
177
- waitAtMost (() -> receivedMessages . getOrDefault ( 0 , new AtomicInteger ( 1 )) .get () == messageCount );
171
+ waitAtMost (() -> consumer2MessageCount .get () == messageCount );
178
172
assertThat (lastReceivedOffset ).hasValueGreaterThan (firstWaveLimit );
179
- assertThat (receivedMessages . get ( 0 ) ).hasValue (messageCount );
173
+ assertThat (consumer1MessageCount ).hasValue (messageCount );
180
174
181
175
consumer2 .close ();
182
176
}
0 commit comments