Skip to content

Kafka Message Header API #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 24, 2023

Conversation

felixschlegel
Copy link
Contributor

Motivation:

Be able to attach headers to a KafkaProducerMessage and read out
headers attached to a KafkaConsumerMessage.

Modifications:

  • create new type struct KafkaHeader representing a key-value pair of
    String key and ByteBuffer value
  • add property headers: [KafkaHeader] to KafkaConsumerMessage and
    KafkaConsumerMessage
  • use rd_kafka_produceva (varidadic arguments) to produce messages as
    rd_kafka_produce did not support setting message headers
  • create helper class RDKafkaUnsafeProducerMessage that helps
    configuring the varidadic argument array for rd_kafka_produceva
  • add new test asserting that both producing and consuming messages with
    message headers works

@felixschlegel felixschlegel force-pushed the fs-kafka-message-header branch from 2dbf00b to bb8244c Compare August 22, 2023 13:39
public var key: String

/// The value associated with the header.
public var value: ByteBuffer? // TODO: KafkaContiguousBytes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would do the same KafkaContiguousBytes dance here as in KafkaProducerMessage.
However, the problem is that KafkaHeader is used in an array, which means the entire array would have to be generic over one specific KafkaContiguousBytes type which feels wrong too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to leave this with ByteBuffer for now. Restricting the Array to single type isn't great.

self._internal[index].vtype = RD_KAFKA_VTYPE_HEADER

// Copy name C String (strdup) as we do not own the headers: [KafkaHeader] array
guard let headerNameCPointer = header.key.withCString(strdup) else {
Copy link
Contributor Author

@felixschlegel felixschlegel Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really happy with doing the strdup here. However, we need to somehow need to pass a char * pointer which we get from a header entry in the headers: [KafkaHeader] array (hence accessing the pointer inside of the closure is weird).

I had a couple of ideas to reduce copying here:

  • write C glue code that takes the headers: [KafkaHeader] array and does essentially what RDKakfaUnsafeProducerMessage does, just in a more enclosed way
  • use copy-on-write / a String pool to avoid unnecessary copies of the String pointer (if a message header name occurs more than once we can avoid the additional copy with our String pool)

let valueCopyBuffer = UnsafeMutableRawPointer.allocate(byteCount: value.readableBytes, alignment: 1)
value.withUnsafeReadableBytes { valueBuffer in
if let baseAddress = valueBuffer.baseAddress {
valueCopyBuffer.copyMemory(from: baseAddress, byteCount: valueBuffer.count)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also want to avoid copying here, see comment about strdup

@felixschlegel felixschlegel force-pushed the fs-kafka-message-header branch 2 times, most recently from 48515b3 to 2f158e3 Compare August 23, 2023 20:20
Comment on lines 277 to 283
if kafkaHeaders.isEmpty {
// Base case: we have read all kafkaHeaders and now invoke the accessor closure
// that can safely access the pointers in cHeaders
return try body(cHeaders)
} else {
guard let kafkaHeader = kafkaHeaders.popLast() else {
fatalError("kafkaHeaders should not be nil")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we swap this around so that we only guard. Then we don't need the fatalError at all

Motivation:

Be able to attach headers to a `KafkaProducerMessage` and read out
headers attached to a `KafkaConsumerMessage`.

Modifications:

* create new type `struct KafkaHeader` representing a key-value pair of
  `String` key and `ByteBuffer` value
* add property `headers: [KafkaHeader]` to `KafkaConsumerMessage` and
  `KafkaConsumerMessage`
* use `rd_kafka_produceva` (varidadic arguments) to produce messages as
  `rd_kafka_produce` did not support setting message headers
* create helper class `RDKafkaUnsafeProducerMessage` that helps
  configuring the varidadic argument array for `rd_kafka_produceva`
* add new test asserting that both producing and consuming messages with
  message headers works
Modifications:

* no copying of `KafkaProducerMessage` headers and values -> build
  scoped accessor helper that recursively accesses all underlying
  pointers of the `KafkaProducerMessage`'s `headers: [KafkaHeader]`
* only use `rd_kafka_produceva` when `message.headers.isEmpty == false`
@felixschlegel felixschlegel force-pushed the fs-kafka-message-header branch from b393742 to bb403b2 Compare August 24, 2023 13:27
@FranzBusch FranzBusch merged commit 8592c61 into swift-server:main Aug 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants