Skip to content

Commit 258fde7

Browse files
committed
Remove Invalid Reactive Test
- author misunderstood `sendOffsetsToTransaction`; the test sends the offsets for a produced record.
1 parent 132f20d commit 258fde7

File tree

1 file changed

+0
-42
lines changed

1 file changed

+0
-42
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -252,48 +252,6 @@ private Flux<SenderRecord<Integer, String, Integer>> generateSenderRecords(int r
252252
});
253253
}
254254

255-
@Test
256-
public void shouldSendOffsetsToTransaction() {
257-
ProducerRecord<Integer, String> producerRecord =
258-
new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY,
259-
DEFAULT_VALUE);
260-
261-
TransactionManager tm = reactiveKafkaProducerTemplate.transactionManager();
262-
263-
StepVerifier.create(reactiveKafkaProducerTemplate.sendTransactionally(SenderRecord.create(producerRecord, null))
264-
.then())
265-
.expectComplete()
266-
.verify(DEFAULT_VERIFY_TIMEOUT);
267-
268-
StepVerifier.create(reactiveKafkaConsumerTemplate.receive())
269-
.assertNext(rr -> {
270-
SenderRecord<Integer, String, Object> transformed =
271-
SenderRecord.create(rr.topic(), rr.partition(), rr.timestamp(), rr.key(), rr
272-
.value() + "xyz", null);
273-
Mono<Void> sendOffsets = tm.begin()
274-
.then(
275-
reactiveKafkaProducerTemplate.send(transformed)
276-
.map(SenderResult::recordMetadata)
277-
.map(rm -> {
278-
Map<TopicPartition, OffsetAndMetadata> offsets =
279-
Collections.singletonMap(
280-
new TopicPartition(rm.topic(), rm.partition()),
281-
new OffsetAndMetadata(rm.offset() + 1));
282-
return tm.sendOffsets(offsets, CONSUMER_GROUP_ID);
283-
}))
284-
.then(tm.commit()).then();
285-
StepVerifier.create(sendOffsets)
286-
.expectComplete()
287-
.verify(DEFAULT_VERIFY_TIMEOUT);
288-
})
289-
.assertNext(rr -> {
290-
assertThat(rr.value()).startsWith(DEFAULT_VALUE + "xyz");
291-
rr.receiverOffset().acknowledge();
292-
})
293-
.thenCancel()
294-
.verify(DEFAULT_VERIFY_TIMEOUT);
295-
}
296-
297255
@Test
298256
public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnceWithException() {
299257
ProducerRecord<Integer, String> producerRecord =

0 commit comments

Comments
 (0)