Skip to content

RetryableTopic - breaking changes in the error handler #2184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
v-chernyshev opened this issue Mar 23, 2022 · 20 comments · Fixed by #2185 or #2389
Closed

RetryableTopic - breaking changes in the error handler #2184

v-chernyshev opened this issue Mar 23, 2022 · 20 comments · Fixed by #2185 or #2389

Comments

@v-chernyshev
Copy link

In what version(s) of Spring for Apache Kafka are you seeing this issue?

2.8.4.

Describe the bug

ListenerContainerFactoryConfigurer now creates its error handler here:

protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
errorHandler.defaultFalse();
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
if (this.blockingExceptionTypes != null) {
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
}
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}

There are two breaking changes/regressions compared to version 2.8.3:

  • createDefaultErrorHandlerInstance no longer provides a "no retry" back off strategy by default. The outcome of this change is that a to-be-recovered record is retried 10 times instead of immediately being sent into the next recovery topic.
  • errorHandler.defaultFalse completely disables the default classification of the fatal exceptions (such as DeserializationException.class). In fact, every classification request now returns false.

The second change causes every back off exception to be printed at error level due to the code here:

protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::recovered;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}
catch (Exception ex) {
if (records.size() > 0) {
this.logger.error(ex, () -> "Recovery of record ("
+ KafkaUtils.format(records.get(0)) + ") failed");
this.failureTracker.getRetryListeners().forEach(rl ->
rl.recoveryFailed(records.get(0), thrownException, ex));
}
return (rec, excep, cont, consumer) -> NEVER_SKIP_PREDICATE.test(rec, excep);
}
return (rec, excep, cont, consumer) -> ALWAYS_SKIP_PREDICATE.test(rec, excep);
}
}

getClassifier().classify(thrownException) now always returns false and this.failureTracker.getRecoverer().accept may throw a back off exception, which does not reach the following special back off exception handler any longer:

catch (Exception ex) {
if (isBackoffException(ex)) {
logger.debug(ex, () -> ListenerUtils.recordToString(record)
+ " included in seeks due to retry back off");
}

To Reproduce

Unfortunately, I do not have a sample right now, but I will try to prepare it this week. I believe that any application that uses the retryable topics recovery mechanism is impacted.

Expected behavior

I expect the default behaviour of the retryable topics recovery system to be identical to version 2.8.3.

@v-chernyshev
Copy link
Author

A slight clarification regarding the back off exception handler changes. The passed thrownException may be a back off exception, but then it is unwrapped here:

private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
if (SeekUtils.isBackoffException(e)) {
throw (NestedRuntimeException) e; // Necessary to not commit the offset and seek to current again
}

The catch statement of getRecoveryStrategy intercepts it and prints an error.

@tomazfernandes
Copy link
Contributor

Hi @v-chernyshev, thanks for reporting this.

There are two breaking changes/regressions compared to version 2.8.3:

  • createDefaultErrorHandlerInstance no longer provides a "no retry" back off strategy by default. The outcome of this change is that a to-be-recovered record is retried 10 times instead of immediately being sent into the next recovery topic.
  • errorHandler.defaultFalse completely disables the default classification of the fatal exceptions (such as DeserializationException.class). In fact, every classification request now returns false.

I don't see this behavior in my tests. Since all classifications now return false by default, they skip the 10 retrials, and only attempt retrials if you add exceptions to be retried on the LCFC. Any further details you can share on that would be appreciated.

The second change causes every back off exception to be printed at error level due to the code here:

This one I can reproduce. The idea is that KafkaBackOffExceptions should be logged at it's own level, and we now set it to DEBUG. I'll try to come with a workaround for this while we look for the fix.

@tomazfernandes
Copy link
Contributor

Also, the previous FATAL classifications in DLPR were in practice no-ops - since the policy was no retries, it didn't matter whether the exception was FATAL or not. We do have the same FATAL classifications now on the DefaultDestinationTopicResolver class and those exceptions are now forwarded directly to the DLT by default. You can add and remove exceptions from the list.

@tomazfernandes
Copy link
Contributor

A slight clarification regarding the back off exception handler changes. The passed thrownException may be a back off exception, but then it is unwrapped here:

private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
if (SeekUtils.isBackoffException(e)) {
throw (NestedRuntimeException) e; // Necessary to not commit the offset and seek to current again
}

The catch statement of getRecoveryStrategy intercepts it and prints an error.

Just for clarification, the exception isn't unwrapped here, it's just thrown again.

@v-chernyshev
Copy link
Author

Just for clarification, the exception isn't unwrapped here, it's just thrown again.

Yes, you are absolutely right, I didn't phrase myself properly.

I don't see this behavior in my tests. Since all classifications now return false by default

All right, I'll try to come up with an example based on our code, hopefully by the end of tomorrow. Thanks for looking into it!

@tomazfernandes
Copy link
Contributor

@v-chernyshev ok, thanks. I'll keep looking for the behavior you described.

Can you just please check if by any chance you're adding exceptions to the DeadLetterPublishingRecoverer classification? That would be the most likely cause.

@v-chernyshev
Copy link
Author

v-chernyshev commented Mar 23, 2022

Can you just please check if by any chance you're adding exceptions to the DeadLetterPublishingRecoverer classification? That would be the most likely cause.

Yes, your guess is correct. In my tests I added all the default exceptions back via setClassifications and replaced the default value with true to partially restore the 2.8.3 behaviour. The now default back off strategy, however, has this.noRetries in FailedRecordTracker.recovered set to false, so the else branch is picked. Then nextBackOff is equal to zero, the thread sleeps for 10 milliseconds in ListenerUtils.stoppableSleep and then false is returned from the method.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Mar 23, 2022

We have this logic in FailedRecordProcessor.getRecoveryStrategy:

if (getClassifier().classify(thrownException)) {
return this.failureTracker::recovered;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}

Since the classification returns false, it should go to the else branch which has no sleep, which is what happens on my tests.

Looks right?

@v-chernyshev
Copy link
Author

Yes, with an empty classifier it indeed always goes into the else branch. Unfortunately, the only way that I could find to avoid getting the back off exception logs at error level with version 2.8.4 was to redirect the code flow to return this.failureTracker::recovered :(

Admittedly, I do not really know how the return value of getClassifier().classify impacts the various parts of Spring Kafka. I'll review the 2.8.4 changelog to better understand the exception classifier changes.

@tomazfernandes
Copy link
Contributor

Ok, so just to make sure we're on the same page. Functionality-wise the framework is behaving as expected - it only retries when it should.

The problem at hand is that KafkaBackOffExceptions are being logged as ERROR - and that's what caught your attention in the first place.

Does that sound about right?

@tomazfernandes
Copy link
Contributor

We've documented the changes here

@v-chernyshev
Copy link
Author

Thanks for the link, somehow I've missed this section. And yes, all of it started with the back off exceptions showing up in the logs, which is the only issue. Everything else is due to my misunderstanding of the changes, sorry for the confusion.

@tomazfernandes
Copy link
Contributor

No problem, I totally understand that starting to see ERROR logs everywhere would lead to worries.

It was really good that you brought this up early - now we had a chance to double check the behavior and see that the changes we made are ok functionality-wise, and can also start looking for a workaround and fix for the logs. So thanks a lot for that.

Specially because, well, if that's really the case, people should be seeing these logs everywhere, so I expect to see this issue popping around again here and on StackOverflow.

It also occurred to me that the KafkaBackOffException may get in the way of the blocking retries and count against them, so I'll also look into that.

@tomazfernandes
Copy link
Contributor

KafkaBackOffException doesn't get in the way of blocking retries because those are opt-in and so the user would have to explicitly add KBOE to the classification list. At least one integration test failed when I added it, so we seem to be covered there.

One unexpected side effect though is that retryListeners are notified of the KafkaBackOffException with recoveryFailed, so that might be a problem to some users.

@tomazfernandes
Copy link
Contributor

As for the workaround, this one based on what you suggested worked out for me:

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
						DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
						@Qualifier(RetryTopicInternalBeanNames
						    .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
	ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);

	lcfc.setBlockingRetriesBackOff(new FixedBackOff(0, 0));
	lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh).setClassifications(Collections.emptyMap(), true));
	return lcfc;
}

tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Mar 23, 2022
tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Mar 23, 2022
tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Mar 23, 2022
tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Mar 23, 2022
@tomazfernandes
Copy link
Contributor

tomazfernandes commented Mar 23, 2022

@v-chernyshev, I opened a PR with a fix for the log issue, if you're able to take a look it'd be great. It's a simple one but those are the worst 😄

#2184

I'm not sure retryListeners being notified with the KafkaBackOffException on recoveryFailed is a bug or a feature, but seems it's been like that since before the last version changes, so since no one is complaining I left it that way.

Thanks

@v-chernyshev
Copy link
Author

@garyrussell @tomazfernandes I'm afraid that this problem has started happening again in version 2.9.0 with seeks after error set to false. A KafkaBackoffException thrown here:

private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException {
TopicPartition topicPartition = context.getTopicPartition();
MessageListenerContainer container = getListenerContainerFromContext(context);
container.pausePartition(topicPartition);
this.backOffHandler.onNextBackOff(container, topicPartition, backOffTime);
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
"backing off for approx. %s millis.", topicPartition.partition(),
topicPartition.topic(), backOffTime),
topicPartition, context.getListenerId(), context.getDueTimestamp());
}

Is then wrapped in a ListenerExecutionFailedException here:

catch (Exception ex) { // NOSONAR
throw decorateException(ex);
}

Then it reaches this error handler:

try {
invokeErrorHandler(record, iterator, e);
commitOffsetsIfNeeded(record);
}

Which calls handleOne:

else {
boolean handled = this.commonErrorHandler.handleOne(rte, record, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);

This method is defined here:

@Override
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {
try {
return getFailureTracker().recovered(record, thrownException, container, consumer);
}
catch (Exception ex) {
logger.error(ex, "Failed to handle " + KafkaUtils.format(record) + " with " + thrownException);
return false;
}
}

The exception, quite rightfully, cannot be recovered, so logger.error is invoked. The end result is an error in the logs for every back-off.

@v-chernyshev
Copy link
Author

Please let me know if I should submit a separate issue with these details.

@garyrussell
Copy link
Contributor

Please open a new issue; this is not a regression but a problem with the new feature. We hoped that folks would try it out via the milestones/release candidate before it was formally released.

Thanks.

@v-chernyshev
Copy link
Author

Thank you for the prompt reply.

We hoped that folks would try it out via the milestones/release candidate before it was formally released.

Unfortunately, I have just found a bit of spare time to update the dependencies of the service that uses this feature 😞 I'll send new issues with the feedback based on my testing of version 2.9.0.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Sep 6, 2022
Resolves spring-projects#2184

`KafkaBackOffException`s are incorrectly logged at ERROR level during
retries when using the no-seek error handling mode.

They are not logged in the seek mode; see
spring-projects@448871a

**cherry-pick to 2.9.x**
@garyrussell garyrussell reopened this Sep 6, 2022
artembilan pushed a commit that referenced this issue Sep 6, 2022
Resolves #2184

`KafkaBackOffException`s are incorrectly logged at ERROR level during
retries when using the no-seek error handling mode.

They are not logged in the seek mode; see
448871a

**cherry-pick to 2.9.x**
artembilan pushed a commit that referenced this issue Sep 6, 2022
Resolves #2184

`KafkaBackOffException`s are incorrectly logged at ERROR level during
retries when using the no-seek error handling mode.

They are not logged in the seek mode; see
448871a

**cherry-pick to 2.9.x**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment