Skip to content

Commit fb975dd

Browse files
wkennedygaryrussell
authored andcommitted
GH-1184: Add Exactly Once Test Case
Resolves #1184 Initial commit "exactly once" unit tests Clean up new tests, verify that tests run quickly Update imports to meet check styles
1 parent cef32ed commit fb975dd

File tree

1 file changed

+83
-1
lines changed

1 file changed

+83
-1
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,15 +27,18 @@
2727
import java.util.Optional;
2828

2929
import org.apache.kafka.clients.consumer.ConsumerConfig;
30+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3031
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3132
import org.apache.kafka.clients.producer.ProducerConfig;
3233
import org.apache.kafka.clients.producer.ProducerRecord;
3334
import org.apache.kafka.clients.producer.RecordMetadata;
35+
import org.apache.kafka.common.KafkaException;
3436
import org.apache.kafka.common.TopicPartition;
3537
import org.junit.jupiter.api.AfterEach;
3638
import org.junit.jupiter.api.BeforeAll;
3739
import org.junit.jupiter.api.BeforeEach;
3840
import org.junit.jupiter.api.Test;
41+
import org.reactivestreams.Publisher;
3942
import org.reactivestreams.Subscription;
4043

4144
import org.springframework.kafka.support.converter.MessagingMessageConverter;
@@ -56,6 +59,7 @@
5659
/**
5760
* @author Mark Norkin
5861
* @author Gary Russell
62+
* @author Will Kennedy
5963
*
6064
* @since 2.3.0
6165
*/
@@ -281,4 +285,82 @@ public void shouldSendOffsetsToTransaction() {
281285
.verify(DEFAULT_VERIFY_TIMEOUT);
282286
}
283287

288+
@Test
289+
public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnceWithException() {
290+
ProducerRecord<Integer, String> producerRecord =
291+
new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY,
292+
DEFAULT_VALUE);
293+
294+
StepVerifier.create(reactiveKafkaProducerTemplate
295+
.sendTransactionally(SenderRecord.create(producerRecord, null))
296+
.then())
297+
.expectComplete()
298+
.verify();
299+
300+
StepVerifier.create(reactiveKafkaConsumerTemplate
301+
.receiveExactlyOnce(reactiveKafkaProducerTemplate.transactionManager())
302+
.concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, true))
303+
.onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager().abort().then(Mono.error(error)))
304+
)
305+
.expectErrorMatches(throwable -> throwable instanceof KafkaException &&
306+
throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " +
307+
"attempted from state READY to state ABORTING_TRANSACTION"))
308+
.verify();
309+
310+
StepVerifier.create(reactiveKafkaConsumerTemplate
311+
.receive().doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge()))
312+
.assertNext(receiverRecord -> assertThat(receiverRecord.value()).isEqualTo(DEFAULT_VALUE))
313+
.thenCancel()
314+
.verify(DEFAULT_VERIFY_TIMEOUT);
315+
}
316+
317+
@Test
318+
public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnce() {
319+
ProducerRecord<Integer, String> producerRecord =
320+
new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY,
321+
DEFAULT_VALUE);
322+
323+
StepVerifier.create(reactiveKafkaProducerTemplate.sendTransactionally(SenderRecord.create(producerRecord, null))
324+
.then())
325+
.expectComplete()
326+
.verify();
327+
328+
StepVerifier.create(reactiveKafkaConsumerTemplate
329+
.receiveExactlyOnce(reactiveKafkaProducerTemplate.transactionManager())
330+
.concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, false))
331+
.onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager().abort().then(Mono.error(error)))
332+
)
333+
.assertNext(senderResult -> {
334+
assertThat(senderResult.correlationMetadata().intValue()).isEqualTo(DEFAULT_KEY);
335+
assertThat(senderResult.recordMetadata().offset()).isGreaterThan(0);
336+
})
337+
.thenCancel()
338+
.verify(DEFAULT_VERIFY_TIMEOUT);
339+
340+
StepVerifier.create(reactiveKafkaConsumerTemplate
341+
.receive().doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge()))
342+
.assertNext(receiverRecord -> {
343+
assertThat(receiverRecord.value()).isEqualTo(DEFAULT_VALUE + "xyz");
344+
assertThat(receiverRecord.offset()).isGreaterThan(0);
345+
})
346+
.thenCancel()
347+
.verify(DEFAULT_VERIFY_TIMEOUT);
348+
}
349+
350+
private Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, String>> fluxConsumerRecord, boolean failCommit) {
351+
return reactiveKafkaProducerTemplate
352+
.send(fluxConsumerRecord.map(this::toSenderRecord)
353+
.concatWith(failCommit ?
354+
doThrowKafkaException() :
355+
reactiveKafkaProducerTemplate.transactionManager().commit()));
356+
}
357+
358+
private Publisher<? extends SenderRecord<Integer, String, Integer>> doThrowKafkaException() {
359+
throw new KafkaException();
360+
}
361+
362+
private SenderRecord<Integer, String, Integer> toSenderRecord(ConsumerRecord<Integer, String> record) {
363+
return SenderRecord.create(REACTIVE_INT_KEY_TOPIC, record.partition(), null, record.key(), record.value() + "xyz", record.key());
364+
}
365+
284366
}

0 commit comments

Comments
 (0)