-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-2220: Fix TopicPartitionOffset for Retry Topics #2223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my review, but I'm not fully sure what is going on from the logic perspective, so I'll leave that review part to @garyrussell .
Thanks
spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java
Show resolved
Hide resolved
Thanks a lot for the review @artembilan! I think I've addressed all the points, please let me know if I missed out on something, or if there's anything else. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK with me.
Let's give @garyrussell to review this as well on Monday!
@@ -71,7 +74,12 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr | |||
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint); | |||
endpoint.setId(namesProvider.getEndpointId(endpoint)); | |||
endpoint.setGroupId(namesProvider.getGroupId(endpoint)); | |||
endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new)); | |||
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { | |||
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks the initial offset on the main listener because it overwrites the existing ones with a null
initial offset.
We shouldn't do this for the mainEndpoint
in processAndRegisterEndpoint
.
@SpringBootApplication
public class Kgh2220Application {
public static void main(String[] args) {
SpringApplication.run(Kgh2220Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh2220").partitions(1).replicas(1).build();
}
@KafkaListener(id = "kgh2220xxx", topicPartitions = @TopicPartition(topic = "kgh2220",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")))
@RetryableTopic
void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.println(in + "@" + offset);
System.out.println(topic);
System.out.println(KafkaUtils.getConsumerGroupId());
// throw new RuntimeException();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("kgh2220", "foo");
};
}
}
foo@15
kgh2220
kgh2220xxx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, makes sense. I'll fix it shortly, thanks.
Resolves spring-projects#2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
Make EndpointCustomizerFactory ctor public
For test inspiration - see this test (using mocks) that verifies the container performed the initial seeks when the listener was so configured. Lines 99 to 103 in feb3563
|
I guess you could just clone that test and add a |
Looks great, thanks a lot @garyrussell! Will finish up at least basic unit test coverage and try this out. |
@garyrussell, I stumbled into something that might be a small bug in the The hashCode method is: @Override
public int hashCode() {
return this.topicPartition.hashCode() + this.position.hashCode();
} But @Override
public int hashCode() {
return this.position != null
? this.topicPartition.hashCode() + this.position.hashCode()
: this.topicPartition.hashCode();
} But if you have a different idea LMK. Thanks. |
Or maybe add the other fields to the calculation if they're also not null. |
Also, the @Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TopicPartitionOffset that = (TopicPartitionOffset) o;
return Objects.equals(this.topicPartition, that.topicPartition)
&& Objects.equals(this.position, that.position);
} I'm catching these due to unexpected results in assertThat().equals(). |
@garyrussell, I actually can't really simply not call the endpoint customizer for the main topic because currently it's tied to the registration of the topics in the Although, what happens there today is that we use a no-ops suffixer for the main endpoint, so we basically get and set the same thing. But since users can provide their own implementation, I'm not sure that's not a breaking change - WDYT? If that's a problem I can rework that to only leave the topic partition part out. Thanks. |
How about this:
Just include everything into those methods. Separate issue/ PR though... I guess your tests must not rely on the |
Why not instead of if (properties.isMainEndpoint()) {
return topics;
} use if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
}
else if (!properties.isMainEndpoint()) {
endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new));
} Or, do you mean the other updates as also no-ops? |
I think the logic you propose would incur in the same error, wouldn't it? There's nothing to prevent the main endpoint from getting into the first if clause. Or am I missing something? I think if we just make sure the main endpoint doesn't enter either the topics or the TPO parts we should be good and least breaking as possible. WDYT? And yes, all operations there are (or should be by default) no-ops for the main endpoint, except the |
Sorry - |
I'm thinking this: if (!properties.isMainEndpoint()) {
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
}
else {
endpoint.setTopics(endpoint.getTopics().stream().map(namesProvider::getTopicName).toArray(String[]::new));
}
} Notice I changed the setTopics part a bit too so that it won't get names from TPOs if there are any. Although it should not get there with such state. |
Or better: if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null && !properties.isMainEndpoint()) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
}
else {
endpoint.setTopics(endpoint.getTopics().stream().map(namesProvider::getTopicName).toArray(String[]::new));
} This has the least changes and also lets users go through the topics logic with a custom namesProvider if they wish to do something even for the main endpoint topics. Since we're on a schedule for the release, I'll submit this logic along with the test changes, but I can change any of it if you prefer. |
Adjust tests Improve TPO and topics handling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK; I ran this version with my Boot app and it now works as expected.
If @artembilan 's approval from last week still stands, we can merge this.
I've just made a small improvement where users can also use the names provider with the main topic for topic partitions. To my understanding that's a really odd and unlikely combination, so I'm ok with leaving that out if you prefer - or I can submit if you don't mind looking at this again. |
I am confused about the hashcode change, though (reverted); it looks like a valid NPE. |
Sure; feel free if you have another commit. |
I would suggest to make |
@Override
public int hashCode() {
return Objects.hash(this.topicPartition, this.position, this.offset, this.relativeToCurrent);
} Offset and relativeToCurrent are mutable so are not included in the equals or hash. |
Well, TBH, since I'm not that familiar with private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
return new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
} But I'm not so sure that's an exact copy of the original just with the topic name potentially changed (if the user provides a names provider) - there seems to be a Any pointers? As I said, I'm ok with leaving the logic as is too. |
Good point (re fixing hash for back porting) |
Well, I guess this should do the trick: private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
TopicPartitionOffset newTpo = new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent());
return newTpo;
} |
I am not sure why you want to replace the existing TPOs with an exact copy. |
Oh - it's so they can change the name; OK; that makes sense. |
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
...and cherry picked to 2.9.x, 2.8.x, and to 2.7.x after resolving minor conflicts. |
Hmmm - test failures on the back ports...
Investigating... |
I changed these tests because they failed depending on the log level set for the tests. This is probably due to the change to |
Thanks; fixed. |
Resolves #2220
Fix TopicPartitionOffset for Retry Topics
Register NewTopics for Retry Topics only once per topic.
Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
Please let me know if there's anything to be changed.
Thanks