1
1
/*
2
- * Copyright 2016-2023 the original author or authors.
2
+ * Copyright 2016-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
125
125
import org .springframework .kafka .test .context .EmbeddedKafka ;
126
126
import org .springframework .kafka .test .utils .ContainerTestUtils ;
127
127
import org .springframework .kafka .test .utils .KafkaTestUtils ;
128
+ import org .springframework .lang .NonNull ;
128
129
import org .springframework .lang .Nullable ;
129
130
import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
130
131
import org .springframework .transaction .PlatformTransactionManager ;
140
141
* @author Lukasz Kaminski
141
142
* @author Ray Chuan Tay
142
143
* @author Daniel Gentes
144
+ * @author Soby Chacko
143
145
*/
144
146
@ EmbeddedKafka (topics = { KafkaMessageListenerContainerTests .topic1 , KafkaMessageListenerContainerTests .topic2 ,
145
147
KafkaMessageListenerContainerTests .topic3 , KafkaMessageListenerContainerTests .topic4 ,
@@ -928,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception {
928
930
Consumer <Integer , String > consumer = mock (Consumer .class );
929
931
given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
930
932
final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
931
- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
933
+ records .put (new TopicPartition ("foo" , 0 ), List . of (
932
934
new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
933
935
ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
934
936
given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
@@ -1343,7 +1345,6 @@ else if (entry.getValue().offset() == 2) {
1343
1345
logger .info ("Stop batch listener manual" );
1344
1346
}
1345
1347
1346
- @ SuppressWarnings ("deprecation" )
1347
1348
@ Test
1348
1349
public void testBatchListenerErrors () throws Exception {
1349
1350
logger .info ("Start batch listener errors" );
@@ -1416,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
1416
1417
logger .info ("Stop batch listener errors" );
1417
1418
}
1418
1419
1419
- @ SuppressWarnings ({ "unchecked" , "deprecation" })
1420
+ @ SuppressWarnings ({ "unchecked" })
1420
1421
@ Test
1421
1422
public void testBatchListenerAckAfterRecoveryMock () throws Exception {
1422
1423
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -1679,7 +1680,7 @@ public void testDefinedPartitions() throws Exception {
1679
1680
@ Override
1680
1681
protected KafkaConsumer <Integer , String > createKafkaConsumer (Map <String , Object > configs ) {
1681
1682
assertThat (configs ).containsKey (ConsumerConfig .MAX_POLL_RECORDS_CONFIG );
1682
- return new KafkaConsumer <Integer , String >(props ) {
1683
+ return new KafkaConsumer <>(props ) {
1683
1684
1684
1685
@ Override
1685
1686
public ConsumerRecords <Integer , String > poll (Duration timeout ) {
@@ -2280,10 +2281,8 @@ public void testStaticAssign() throws Exception {
2280
2281
Map <String , Object > props = KafkaTestUtils .consumerProps ("testStatic" , "false" , embeddedKafka );
2281
2282
2282
2283
DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
2283
- ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset [] {
2284
- new TopicPartitionOffset (topic22 , 0 ),
2285
- new TopicPartitionOffset (topic22 , 1 )
2286
- });
2284
+ ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset (topic22 , 0 ),
2285
+ new TopicPartitionOffset (topic22 , 1 ));
2287
2286
final CountDownLatch latch = new CountDownLatch (1 );
2288
2287
final List <ConsumerRecord <Integer , String >> received = new ArrayList <>();
2289
2288
containerProps .setMessageListener ((MessageListener <Integer , String >) record -> {
@@ -2361,15 +2360,15 @@ public void testBadListenerType() {
2361
2360
containerProps .setMissingTopicsFatal (false );
2362
2361
KafkaMessageListenerContainer <Integer , Foo1 > badContainer =
2363
2362
new KafkaMessageListenerContainer <>(cf , containerProps );
2364
- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2363
+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
2365
2364
.withMessageContaining ("implementation must be provided" );
2366
2365
badContainer .setupMessageListener ((GenericMessageListener <String >) data -> {
2367
2366
});
2368
2367
assertThat (badContainer .getAssignedPartitions ()).isNull ();
2369
2368
badContainer .pause ();
2370
2369
assertThat (badContainer .isContainerPaused ()).isFalse ();
2371
2370
assertThat (badContainer .metrics ()).isEqualTo (Collections .emptyMap ());
2372
- assertThatIllegalArgumentException ().isThrownBy (() -> badContainer . start () )
2371
+ assertThatIllegalArgumentException ().isThrownBy (badContainer :: start )
2373
2372
.withMessageContaining ("Listener must be" );
2374
2373
assertThat (badContainer .toString ()).contains ("none assigned" );
2375
2374
@@ -2386,7 +2385,7 @@ public void testBadAckMode() {
2386
2385
new KafkaMessageListenerContainer <>(cf , containerProps );
2387
2386
badContainer .setupMessageListener ((MessageListener <String , String >) m -> {
2388
2387
});
2389
- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2388
+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
2390
2389
.withMessageContaining ("Consumer cannot be configured for auto commit for ackMode" );
2391
2390
2392
2391
}
@@ -2565,14 +2564,16 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
2565
2564
public void onMessage (ConsumerRecord <String , String > data ) {
2566
2565
if (data .partition () == 0 && data .offset () == 0 ) {
2567
2566
TopicPartition topicPartition = new TopicPartition (data .topic (), data .partition ());
2568
- getSeekCallbackFor (topicPartition ).seekToBeginning (records .keySet ());
2567
+ final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor (topicPartition );
2568
+ assertThat (seekCallbackFor ).isNotNull ();
2569
+ seekCallbackFor .seekToBeginning (records .keySet ());
2569
2570
Iterator <TopicPartition > iterator = records .keySet ().iterator ();
2570
- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2571
- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2572
- getSeekCallbackFor ( topicPartition ) .seekToEnd (records .keySet ());
2571
+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2572
+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2573
+ seekCallbackFor .seekToEnd (records .keySet ());
2573
2574
iterator = records .keySet ().iterator ();
2574
- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2575
- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2575
+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
2576
+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
2576
2577
}
2577
2578
}
2578
2579
@@ -2678,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception {
2678
2679
containerProps .setAckMode (AckMode .RECORD );
2679
2680
containerProps .setClientId ("clientId" );
2680
2681
containerProps .setIdleEventInterval (100L );
2681
- containerProps .setMessageListener ((MessageListener ) rec -> { });
2682
+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
2682
2683
containerProps .setMissingTopicsFatal (false );
2683
2684
KafkaMessageListenerContainer <Integer , String > container =
2684
2685
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2745,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
2745
2746
containerProps .setAckMode (AckMode .RECORD );
2746
2747
containerProps .setClientId ("clientId" );
2747
2748
containerProps .setIdleEventInterval (100L );
2748
- containerProps .setMessageListener ((MessageListener ) rec -> { });
2749
+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
2749
2750
containerProps .setMissingTopicsFatal (false );
2750
2751
KafkaMessageListenerContainer <Integer , String > container =
2751
2752
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2827,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
2827
2828
containerProps .setAckMode (AckMode .RECORD );
2828
2829
containerProps .setClientId ("clientId" );
2829
2830
containerProps .setIdleEventInterval (100L );
2830
- containerProps .setMessageListener ((MessageListener ) rec -> { });
2831
+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
2831
2832
containerProps .setMissingTopicsFatal (false );
2832
2833
KafkaMessageListenerContainer <Integer , String > container =
2833
2834
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2955,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception {
2955
2956
container .start ();
2956
2957
assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
2957
2958
new DirectFieldAccessor (container ).setPropertyValue ("listenerConsumer.assignedPartitions" ,
2958
- Arrays . asList (new TopicPartition ("foo" , 0 )));
2959
+ List . of (new TopicPartition ("foo" , 0 )));
2959
2960
Thread .sleep (500 );
2960
2961
long t1 = System .currentTimeMillis ();
2961
2962
container .stop ();
@@ -3060,16 +3061,12 @@ public void testAckModeCount() throws Exception {
3060
3061
given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
3061
3062
Thread .sleep (50 );
3062
3063
int recordsToUse = which .incrementAndGet ();
3063
- switch (recordsToUse ) {
3064
- case 1 :
3065
- return consumerRecords1 ;
3066
- case 2 :
3067
- return consumerRecords2 ;
3068
- case 3 :
3069
- return consumerRecords3 ;
3070
- default :
3071
- return emptyRecords ;
3072
- }
3064
+ return switch (recordsToUse ) {
3065
+ case 1 -> consumerRecords1 ;
3066
+ case 2 -> consumerRecords2 ;
3067
+ case 3 -> consumerRecords3 ;
3068
+ default -> emptyRecords ;
3069
+ };
3073
3070
});
3074
3071
final CountDownLatch commitLatch = new CountDownLatch (3 );
3075
3072
willAnswer (i -> {
@@ -3107,7 +3104,7 @@ public void testAckModeCount() throws Exception {
3107
3104
container .stop ();
3108
3105
}
3109
3106
3110
- @ SuppressWarnings ({ "unchecked" , "rawtypes" , "deprecation" })
3107
+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
3111
3108
@ Test
3112
3109
public void testCommitErrorHandlerCalled () throws Exception {
3113
3110
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3435,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception {
3435
3432
ContainerProperties containerProps = new ContainerProperties ("foo" );
3436
3433
containerProps .setGroupId ("grp" );
3437
3434
containerProps .setClientId ("clientId" );
3438
- containerProps .setMessageListener ((MessageListener ) msg -> { });
3435
+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
3439
3436
Properties consumerProps = new Properties ();
3440
3437
KafkaMessageListenerContainer <Integer , String > container =
3441
3438
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -3467,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
3467
3464
assertThat (commits .get (5 )).hasSize (2 ); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
3468
3465
assertThat (commits .get (5 ).get (new TopicPartition ("foo" , 1 )))
3469
3466
.isNotNull ()
3470
- .extracting (om -> om . offset () )
3467
+ .extracting (OffsetAndMetadata :: offset )
3471
3468
.isEqualTo (2L );
3472
3469
});
3473
3470
}
@@ -3527,7 +3524,7 @@ else if (call == 1) {
3527
3524
containerProps .setAckMode (ackMode );
3528
3525
containerProps .setClientId ("clientId" );
3529
3526
containerProps .setIdleEventInterval (100L );
3530
- containerProps .setMessageListener ((MessageListener ) msg -> { });
3527
+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
3531
3528
Properties consumerProps = new Properties ();
3532
3529
containerProps .setKafkaConsumerProperties (consumerProps );
3533
3530
KafkaMessageListenerContainer <Integer , String > container =
@@ -3538,7 +3535,7 @@ else if (call == 1) {
3538
3535
verifier .accept (commits );
3539
3536
}
3540
3537
3541
- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
3538
+ @ SuppressWarnings ({ "unchecked" })
3542
3539
@ Test
3543
3540
void testCommitFailsOnRevoke () throws Exception {
3544
3541
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3671,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException {
3671
3668
cfProps .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 45000 ); // wins
3672
3669
given (cf .getConfigurationProperties ()).willReturn (cfProps );
3673
3670
final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
3674
- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
3671
+ records .put (new TopicPartition ("foo" , 0 ), List . of (
3675
3672
new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
3676
3673
ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
3677
3674
ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
@@ -3754,7 +3751,7 @@ void stopImmediately() throws InterruptedException {
3754
3751
}
3755
3752
3756
3753
@ Test
3757
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3754
+ @ SuppressWarnings ({"unchecked" })
3758
3755
public void testInvokeRecordInterceptorSuccess () throws Exception {
3759
3756
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3760
3757
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3796,7 +3793,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
3796
3793
RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
3797
3794
3798
3795
@ Override
3799
- @ Nullable
3796
+ @ NonNull
3800
3797
public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
3801
3798
Consumer <Integer , String > consumer ) {
3802
3799
@@ -3842,7 +3839,7 @@ private static Stream<Arguments> paramsForRecordAllSkipped() {
3842
3839
3843
3840
@ ParameterizedTest (name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}" )
3844
3841
@ MethodSource ("paramsForRecordAllSkipped" )
3845
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3842
+ @ SuppressWarnings ({"unchecked" })
3846
3843
public void testInvokeRecordInterceptorAllSkipped (AckMode ackMode , boolean early ) throws Exception {
3847
3844
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3848
3845
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3869,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
3869
3866
containerProps .setGroupId ("grp" );
3870
3867
containerProps .setAckMode (ackMode );
3871
3868
3872
- containerProps .setMessageListener ((MessageListener ) msg -> {
3869
+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> {
3873
3870
});
3874
3871
containerProps .setClientId ("clientId" );
3875
3872
@@ -3912,7 +3909,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
3912
3909
3913
3910
@ ParameterizedTest (name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}" )
3914
3911
@ ValueSource (booleans = { true , false })
3915
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3912
+ @ SuppressWarnings ({"unchecked" })
3916
3913
public void testInvokeBatchInterceptorAllSkipped (boolean early ) throws Exception {
3917
3914
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3918
3915
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3939,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
3939
3936
containerProps .setGroupId ("grp" );
3940
3937
containerProps .setAckMode (AckMode .BATCH );
3941
3938
3942
- containerProps .setMessageListener ((BatchMessageListener ) msgs -> {
3939
+ containerProps .setMessageListener ((BatchMessageListener <?, ?> ) msgs -> {
3943
3940
});
3944
3941
containerProps .setClientId ("clientId" );
3945
3942
if (!early ) {
@@ -3975,7 +3972,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
3975
3972
}
3976
3973
3977
3974
@ Test
3978
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3975
+ @ SuppressWarnings ({"unchecked" })
3979
3976
public void testInvokeRecordInterceptorFailure () throws Exception {
3980
3977
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3981
3978
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -4015,7 +4012,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
4015
4012
RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
4016
4013
4017
4014
@ Override
4018
- @ Nullable
4015
+ @ NonNull
4019
4016
public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
4020
4017
Consumer <Integer , String > consumer ) {
4021
4018
0 commit comments