Skip to content

Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded backpressure) #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

blindspotbounty
Copy link
Collaborator

@blindspotbounty blindspotbounty commented Dec 11, 2023

This PR primarily addresses #136

Since changes in PR #139 there is no more need in intermediate AsyncStream for messages.
Instead rd_kafka_consumer_poll (client.consumerPoll()) can be used directly from iterator.

That primarily should solve the problem with duplicating messages by leaving its handling to librdkafka. However, it has some other benefits.

For example:

  1. Less allocations/refcounting/etc
  2. Less context switches.
  3. No more need in explicit storeOffset (done by librdkafka automatically)

I've used still pending PR (#149) to test the difference at my machine with the following results:

==========================================================================================================
Threshold deviations for SwiftKafkaConsumer - basic consumer (messages: 1000):SwiftKafkaConsumerBenchmarks
==========================================================================================================
╒══════════════════════════════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╕
│ Time (total CPU) (ms, %)                 │        original │ no_async_seque… │    Difference % │     Threshold % │
╞══════════════════════════════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╡
│ p25                                      │              49 │              43 │              12 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p50                                      │              51 │              44 │              13 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p75                                      │              52 │              46 │              10 │               5 │
╘══════════════════════════════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╛

╒══════════════════════════════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╕
│ Retains (#, %)                           │        original │ no_async_seque… │    Difference % │     Threshold % │
╞══════════════════════════════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╡
│ p25                                      │           20159 │            5335 │              73 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p50                                      │           20159 │            5343 │              73 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p75                                      │           20191 │            5359 │              73 │               5 │
╘══════════════════════════════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╛

╒══════════════════════════════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╕
│ Releases (K, %)                          │        original │ no_async_seque… │    Difference % │     Threshold % │
╞══════════════════════════════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╡
│ p25                                      │              29 │              11 │              59 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p50                                      │              29 │              11 │              59 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p75                                      │              29 │              12 │              59 │               5 │
╘══════════════════════════════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╛

╒══════════════════════════════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╕
│ Object allocs (#, %)                     │        original │ no_async_seque… │    Difference % │     Threshold % │
╞══════════════════════════════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╡
│ p25                                      │            6003 │            4619 │              23 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p50                                      │            6023 │            4631 │              23 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p75                                      │            6043 │            4671 │              22 │               5 │
╘══════════════════════════════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╛

╒══════════════════════════════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╕
│ Context switches (#, %)                  │        original │ no_async_seque… │    Difference % │     Threshold % │
╞══════════════════════════════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╡
│ p25                                      │            1407 │            1326 │               5 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p50                                      │            1413 │            1331 │               5 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p75                                      │            1421 │            1344 │               5 │               5 │
╘══════════════════════════════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╛

╒══════════════════════════════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╕
│ (Alloc + Retain) - Release Δ (#, %)      │        original │ no_async_seque… │    Difference % │     Threshold % │
╞══════════════════════════════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╡
│ p25                                      │            3291 │            1953 │              40 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p50                                      │            3337 │            1961 │              41 │               5 │
├──────────────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ p75                                      │            3343 │            1991 │              40 │               5 │
╘══════════════════════════════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╛
...
New baseline 'no_async_sequence__use_sleep' is BETTER than the 'original' baseline thresholds.

@blindspotbounty blindspotbounty marked this pull request as ready for review December 12, 2023 09:44
@blindspotbounty
Copy link
Collaborator Author

@felixschlegel, @FranzBusch could you give us an idea if these changes suit your envision on swift-kafka-client future development, please?
If not, please suggest how would you like to address the issue with duplicated messages, please?
Understand of this is top priority for us to align our code.

@blindspotbounty
Copy link
Collaborator Author

@FranzBusch, @felixschlegel could you advise if you have any thoughts on this PR, please?

Comment on lines 73 to 75
// FIXME: there are two possibilities:
// 1. Create gcd queue and wait blocking call client.consumerPoll() -> faster reaction on new messages
// 2. Sleep in case there are no messages -> easier to implement + no problems with gcd Sendability
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both are not great but the good thing is I think we have a solution for this in the future with https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md we are able to create our own custom executor for a KafkaConsumer this can be backed by a p_thread in the end (we can use NIO here) and then just do withTaskExecutorPreference in the next() call before calling the underlying rd Kafka API. This makes sure we have one thread that we can freely block and we get unblocked as soon as we have a message. It just comes with the overhead of a thread hop. Theoretically we could do this conditionally by trying to poll -> no messages -> executor preference -> blocking poll.

The problem is that this feature is only available on the latest nightly Swift versions. So what I would propose for now is going forward with the sleep based implementation and creating an issue for adopting task executor preference here in the future.

@blindspotbounty
Copy link
Collaborator Author

Finally had some time to go through my changes and add couple of minor changes:

  1. Deprecate backpressure settings -> add message to rely on MessageOptions
  2. Added comment regarding future move to task executor preference

@blindspotbounty blindspotbounty changed the title [suggestion] wrap rd_kafka_consumer_poll into iterator Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded backpressure) Apr 16, 2024
/// A back pressure strategy based on high and low watermarks.
///
/// The consumer maintains a buffer size between a low watermark and a high watermark
/// to control the flow of incoming messages.
///
/// - Parameter low: The lower threshold for the buffer size (low watermark).
/// - Parameter high: The upper threshold for the buffer size (high watermark).
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
return .init(backPressureStrategy: .watermark(low: low, high: high))
@available(*, deprecated, message: "Use MessageOptions to control backpressure")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to depreciate stuff here. We aren't 1.0.0 yet so let's just remove it!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed deprecated code.

Comment on lines +94 to +95
// Currently use Task.sleep() if no new messages, should use task executor preference when implemented:
// https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create an issue for this to track it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have one #165 !

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM just two nits

@blindspotbounty blindspotbounty merged commit 298067a into swift-server:main Apr 26, 2024
blindspotbounty added a commit to ordo-one/swift-kafka-client that referenced this pull request Aug 6, 2024
* Feature: expose librdkafka statistics as swift metrics (swift-server#92)

* introduce statistics for producer

* add statistics to new consumer with events

* fix some artefacts

* adjust to KeyRefreshAttempts

* draft: statistics with metrics

* make structures internal

* Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift

Co-authored-by: Felix Schlegel <[email protected]>

* Update Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

Co-authored-by: Felix Schlegel <[email protected]>

* Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift

Co-authored-by: Felix Schlegel <[email protected]>

* Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift

Co-authored-by: Felix Schlegel <[email protected]>

* address review comments

* formatting

* map gauges in one place

* move json mode as rd kafka statistics, misc renaming + docc

* address review comments

* remove import Metrics

* divide producer/consumer configuration

* apply swiftformat

* fix code after conflicts

* fix formatting

---------

Co-authored-by: Felix Schlegel <[email protected]>

* Add benchmark infratructure without actual tests (swift-server#146)

* add benchmark infratructure without actual test

* apply swiftformat

* fix header in sh file

* use new async seq methods

* Update to latest librdkafka & add a define for RAND_priv_bytes (swift-server#148)

Co-authored-by: Franz Busch <[email protected]>

* exit from consumer batch loop when no more messages left (swift-server#153)

* Lower requirements for consumer state machine (swift-server#154)

* lower requirements for kafka consumer

* add twin test for kafka producer

* defer source.finish (swift-server#157)

* Add two consumer benchmark (swift-server#149)

* benchmark for consumer

* attempty to speedup benchmarks

* check CI works for one test

* enable one more test

* try to lower poll interval

* adjust max duration of test

* remain only manual commit test

* check if commit is the reason for test delays

* try all with schedule commit

* revert max test time to 5 seconds

* dockerfiles

* test set threasholds

* create dummy thresholds from ci results

* disable benchmark in CI

* add header

* add stable metrics

* update thresholds to stable metrics only

* try use '1' instead of 'true'

* adjust thresholds to CI results (as temporary measure)

* set 20% threshold..

* move arc to unstable metrics

* try use 'true' in quotes for CI

* try reduce number of messages for more reliable results

* try upgrade bench

* disable benchmark in CI

* Update librdkafka for BoringSSL (swift-server#162)

* chore(patch): [sc-8379] use returned error (swift-server#163)

* [producer message] Allow optional key for initializer (swift-server#164)

Co-authored-by: Harish Yerra <[email protected]>

* Allow groupID to be specified when assigning partition (swift-server#161)

* Allow groupID to be specified when assigning partition

Motivation:

A Consumer Group can provide a lot of benefits even if the
dynamic loadbalancing features are not used.

Modifications:

Allow for an optional GroupID when creating a partition
consumer.

Result:

Consumer Groups can now be used when manual assignment is
used.

* fix format

---------

Co-authored-by: Ómar Kjartan Yasin <[email protected]>
Co-authored-by: blindspotbounty <[email protected]>
Co-authored-by: Franz Busch <[email protected]>

* Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded backpressure) (swift-server#158)

* remove message sequence

* test consumer with implicit rebalance

* misc + format

* remove artefact

* don't check a lot of messages

* fix typo

* slow down first consumer to lower message to fit CI timeout

* remove helpers

* use exact benchmark version to avoid missing thresholds error (as no thresholds so far)

* add deprecated marks for backpressure, change comment for future dev

* address comments

---------

Co-authored-by: Felix Schlegel <[email protected]>
Co-authored-by: Axel Andersson <[email protected]>
Co-authored-by: Franz Busch <[email protected]>
Co-authored-by: Samuel M <[email protected]>
Co-authored-by: Harish Yerra <[email protected]>
Co-authored-by: Harish Yerra <[email protected]>
Co-authored-by: Omar Yasin <[email protected]>
Co-authored-by: Ómar Kjartan Yasin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants