Skip to content

GH-2220: Fix TopicPartitionOffset for Retry Topics #2223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 14, 2022

Conversation

tomazfernandes
Copy link
Contributor

Resolves #2220

  • Fix TopicPartitionOffset for Retry Topics

  • Register NewTopics for Retry Topics only once per topic.

  • Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

Please let me know if there's anything to be changed.

Thanks

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my review, but I'm not fully sure what is going on from the logic perspective, so I'll leave that review part to @garyrussell .

Thanks

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Apr 8, 2022

Thanks a lot for the review @artembilan! I think I've addressed all the points, please let me know if I missed out on something, or if there's anything else.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK with me.

Let's give @garyrussell to review this as well on Monday!

@@ -71,7 +74,12 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
endpoint.setId(namesProvider.getEndpointId(endpoint));
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new));
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
Copy link
Contributor

@garyrussell garyrussell Apr 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the initial offset on the main listener because it overwrites the existing ones with a null initial offset.

We shouldn't do this for the mainEndpoint in processAndRegisterEndpoint.

@SpringBootApplication
public class Kgh2220Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh2220Application.class, args);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh2220").partitions(1).replicas(1).build();
	}

	@KafkaListener(id = "kgh2220xxx", topicPartitions = @TopicPartition(topic = "kgh2220",
			partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")))
	@RetryableTopic
	void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
			@Header(KafkaHeaders.OFFSET) long offset) {
		System.out.println(in + "@" + offset);
		System.out.println(topic);
		System.out.println(KafkaUtils.getConsumerGroupId());
//		throw new RuntimeException();
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> {
			template.send("kgh2220", "foo");
		};
	}

}
foo@15
kgh2220
kgh2220xxx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, makes sense. I'll fix it shortly, thanks.

Resolves spring-projects#2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
Make EndpointCustomizerFactory ctor public
@garyrussell
Copy link
Contributor

For test inspiration - see this test (using mocks) that verifies the container performed the initial seeks when the listener was so configured.

inOrder.verify(this.consumer).assign(any(Collection.class));
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 0), 0L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));

@garyrussell
Copy link
Contributor

I guess you could just clone that test and add a @RetryableTopic to its listener.

@tomazfernandes
Copy link
Contributor Author

Looks great, thanks a lot @garyrussell! Will finish up at least basic unit test coverage and try this out.

@tomazfernandes
Copy link
Contributor Author

@garyrussell, I stumbled into something that might be a small bug in the TopicPartitionOffset code.

The hashCode method is:

@Override
public int hashCode() {
    return this.topicPartition.hashCode() + this.position.hashCode();
}

But position is a nullable field, so in this case it throws a NPE if used. If that's an actual bug, DYT I should fix this as part of this PR, or should I leave it alone? The obvious fix I see would be something like:

@Override
public int hashCode() {
    return this.position != null
                      ? this.topicPartition.hashCode() + this.position.hashCode()
                      : this.topicPartition.hashCode();
}

But if you have a different idea LMK.

Thanks.

@tomazfernandes
Copy link
Contributor Author

Or maybe add the other fields to the calculation if they're also not null.

@tomazfernandes
Copy link
Contributor Author

Also, the equals method doesn't take the offset into account:

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}
		TopicPartitionOffset that = (TopicPartitionOffset) o;
		return Objects.equals(this.topicPartition, that.topicPartition)
				&& Objects.equals(this.position, that.position);
	}

I'm catching these due to unexpected results in assertThat().equals().

@tomazfernandes
Copy link
Contributor Author

@garyrussell, I actually can't really simply not call the endpoint customizer for the main topic because currently it's tied to the registration of the topics in the DestinationTopicProcessor, so I made it return early.

Although, what happens there today is that we use a no-ops suffixer for the main endpoint, so we basically get and set the same thing. But since users can provide their own implementation, I'm not sure that's not a breaking change - WDYT?

If that's a problem I can rework that to only leave the topic partition part out.

Thanks.

@artembilan
Copy link
Member

How about this:

@Override
	public boolean equals(Object o) {
		if (this == o) return true;
		if (o == null || getClass() != o.getClass()) return false;
		TopicPartitionOffset that = (TopicPartitionOffset) o;
		return this.relativeToCurrent == that.relativeToCurrent &&
				Objects.equals(topicPartition, that.topicPartition)
				&& this.position == that.position
				&& Objects.equals(this.offset, that.offset);
	}

	@Override
	public int hashCode() {
		return Objects.hash(this.topicPartition, this.position, this.offset, this.relativeToCurrent);
	}

Just include everything into those methods.

Separate issue/ PR though...

I guess your tests must not rely on the equals() and hashCode() for now or at all and the verification logic must be slightly different.

@garyrussell
Copy link
Contributor

I actually can't really simply not call the endpoint customizer for the main topic

Why not instead of

if (properties.isMainEndpoint()) {
	return topics;
}

use

if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
	endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
}
else if (!properties.isMainEndpoint()) {
	endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new));
}

Or, do you mean the other updates as also no-ops?

@tomazfernandes
Copy link
Contributor Author

I think the logic you propose would incur in the same error, wouldn't it? There's nothing to prevent the main endpoint from getting into the first if clause. Or am I missing something?

I think if we just make sure the main endpoint doesn't enter either the topics or the TPO parts we should be good and least breaking as possible. WDYT?

And yes, all operations there are (or should be by default) no-ops for the main endpoint, except the getTopics part in the beginning which creates the TopicNamesHolder instances that we need.

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Apr 14, 2022

Sorry - customizeAndRegisterTopics in the beggining.

@tomazfernandes
Copy link
Contributor Author

I'm thinking this:

	if (!properties.isMainEndpoint()) {
		if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
			endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
		}
		else {
			endpoint.setTopics(endpoint.getTopics().stream().map(namesProvider::getTopicName).toArray(String[]::new));
		}
	}

Notice I changed the setTopics part a bit too so that it won't get names from TPOs if there are any. Although it should not get there with such state.

@tomazfernandes
Copy link
Contributor Author

Or better:

	if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null && !properties.isMainEndpoint()) {
		endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, endpoint.getTopicPartitionsToAssign()));
	}
	else {
		endpoint.setTopics(endpoint.getTopics().stream().map(namesProvider::getTopicName).toArray(String[]::new));
	}

This has the least changes and also lets users go through the topics logic with a custom namesProvider if they wish to do something even for the main endpoint topics.

Since we're on a schedule for the release, I'll submit this logic along with the test changes, but I can change any of it if you prefer.

Adjust tests
Improve TPO and topics handling
Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK; I ran this version with my Boot app and it now works as expected.

If @artembilan 's approval from last week still stands, we can merge this.

@tomazfernandes
Copy link
Contributor Author

I've just made a small improvement where users can also use the names provider with the main topic for topic partitions. To my understanding that's a really odd and unlikely combination, so I'm ok with leaving that out if you prefer - or I can submit if you don't mind looking at this again.

@garyrussell
Copy link
Contributor

I am confused about the hashcode change, though (reverted); it looks like a valid NPE.

@garyrussell
Copy link
Contributor

Sure; feel free if you have another commit.

@artembilan
Copy link
Member

I would suggest to make TopicPartitionOffset equals() and hashCode() in the separate PR which could be back-ported to all supported versions.

@garyrussell
Copy link
Contributor

@artembilan

	@Override
	public int hashCode() {
		return Objects.hash(this.topicPartition, this.position, this.offset, this.relativeToCurrent);
	}

Offset and relativeToCurrent are mutable so are not included in the equals or hash.

#1554

@tomazfernandes
Copy link
Contributor Author

Well, TBH, since I'm not that familiar with TopicPartitionOffset, I'm not so sure about the new logic - that's why left it out in the first place - didn't want to add complexity so close to the release. I thought of doing this:

	private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
		return new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
				tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
	}

But I'm not so sure that's an exact copy of the original just with the topic name potentially changed (if the user provides a names provider) - there seems to be a relativeToCurrent flag which is not considered in this logic.

Any pointers? As I said, I'm ok with leaving the logic as is too.

@garyrussell
Copy link
Contributor

Good point (re fixing hash for back porting)

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Apr 14, 2022

Well, I guess this should do the trick:

	private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
		TopicPartitionOffset newTpo = new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
				tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
		newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent());
		return newTpo;
	}

@garyrussell
Copy link
Contributor

I am not sure why you want to replace the existing TPOs with an exact copy.

@garyrussell
Copy link
Contributor

Oh - it's so they can change the name; OK; that makes sense.

@garyrussell garyrussell merged commit 80b1770 into spring-projects:main Apr 14, 2022
garyrussell pushed a commit that referenced this pull request Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
garyrussell pushed a commit that referenced this pull request Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
garyrussell pushed a commit that referenced this pull request Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
@garyrussell
Copy link
Contributor

...and cherry picked to 2.9.x, 2.8.x, and to 2.7.x after resolving minor conflicts.

@garyrussell
Copy link
Contributor

Hmmm - test failures on the back ports...

org.mockito.exceptions.verification.WantedButNotInvoked: Wanted but not invoked:
consumerRecordMessage.topic();
-> at org.springframework.kafka.retrytopic.RetryTopicConfigurerTests.shouldLogConsumerRecordMessage(RetryTopicConfigurerTests.java:369)
Actually, there were zero interactions with this mock.

Wanted but not invoked:
consumerRecordMessage.topic();
-> at org.springframework.kafka.retrytopic.RetryTopicConfigurerTests.shouldLogConsumerRecordMessage(RetryTopicConfigurerTests.java:369)
(88 more lines...)

Investigating...

@tomazfernandes
Copy link
Contributor Author

I changed these tests because they failed depending on the log level set for the tests. This is probably due to the change to KafkaUtils for logging. Easiest fix should be finding out which method is called on the consumer record and changing to assert that.

@garyrussell
Copy link
Contributor

Thanks; fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Listener receives multiple messages when using @RetryableTopic with topicPartitions to reset offset
3 participants