Skip to content

Commit 02d3b72

Browse files
garyrussellartembilan
authored andcommitted
GH-2269: Improve DLPR Extensibility
Resolves #2269 - add getters for fields used in protected methods - change more methods to protected **Cherry-pick to 2.9.x, 2.8.x, 2.7.x** If it doesn't cherry-pick cleanly, I will back-port.
1 parent f94620a commit 02d3b72

File tree

1 file changed

+50
-5
lines changed

1 file changed

+50
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,18 @@ public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
295295
}
296296

297297
/**
298-
* Set the minumum time to wait for message sending. Default is the producer
298+
* If true, wait for the send result and throw an exception if it fails.
299+
* It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
300+
* @return true to wait.
301+
* @since 2.7.14
302+
* @see #setWaitForSendResultTimeout(Duration)
303+
*/
304+
protected boolean isFailIfSendResultIsError() {
305+
return this.failIfSendResultIsError;
306+
}
307+
308+
/**
309+
* Set the minimum time to wait for message sending. Default is the producer
299310
* configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
300311
* @param waitForSendResultTimeout the timeout.
301312
* @since 2.7
@@ -307,8 +318,9 @@ public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
307318
}
308319

309320
/**
310-
* Set the number of milliseconds to add to the producer configuration {@code delivery.timeout.ms}
311-
* property to avoid timing out before the Kafka producer. Default 5000.
321+
* Set the number of milliseconds to add to the producer configuration
322+
* {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
323+
* Default 5000.
312324
* @param buffer the buffer.
313325
* @since 2.7
314326
* @see #setWaitForSendResultTimeout(Duration)
@@ -317,6 +329,16 @@ public void setTimeoutBuffer(long buffer) {
317329
this.timeoutBuffer = buffer;
318330
}
319331

332+
/**
333+
* The number of milliseconds to add to the producer configuration
334+
* {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
335+
* @return the buffer.
336+
* @since 2.7.14
337+
*/
338+
protected long getTimeoutBuffer() {
339+
return this.timeoutBuffer;
340+
}
341+
320342
/**
321343
* Set to false to retain previous exception headers as well as headers for the
322344
* current exception. Default is true, which means only the current headers are
@@ -351,6 +373,15 @@ public void setExceptionHeadersCreator(ExceptionHeadersCreator headersCreator) {
351373
this.exceptionHeadersCreator = headersCreator;
352374
}
353375

376+
/**
377+
* True if publishing should run in a transaction.
378+
* @return true for transactional.
379+
* @since 2.7.14
380+
*/
381+
protected boolean isTransactional() {
382+
return this.transactional;
383+
}
384+
354385
/**
355386
* Clear the header inclusion bit for the header name.
356387
* @param headers the headers to clear.
@@ -612,7 +643,14 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
612643
}
613644
}
614645

615-
private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
646+
/**
647+
* Wait for the send future to complete.
648+
* @param kafkaTemplate the template used to send the record.
649+
* @param outRecord the record.
650+
* @param sendResult the future.
651+
* @param inRecord the original consumer record.
652+
*/
653+
protected void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
616654
ProducerRecord<Object, Object> outRecord,
617655
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult, ConsumerRecord<?, ?> inRecord) {
618656

@@ -637,7 +675,14 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
637675
+ outRecord.topic() + " failed for: " + KafkaUtils.format(inRecord);
638676
}
639677

640-
private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
678+
/**
679+
* Determine the send timeout based on the template's producer factory and
680+
* {@link #setWaitForSendResultTimeout(Duration)}.
681+
* @param template the template.
682+
* @return the timeout.
683+
* @since 2.7.14
684+
*/
685+
protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
641686
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
642687
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
643688
Map<String, Object> props = producerFactory.getConfigurationProperties();

0 commit comments

Comments
 (0)