-
Notifications
You must be signed in to change notification settings - Fork 27
Kafka Message Header API #120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Message Header API #120
Conversation
2dbf00b
to
bb8244c
Compare
Sources/Kafka/KafkaHeader.swift
Outdated
public var key: String | ||
|
||
/// The value associated with the header. | ||
public var value: ByteBuffer? // TODO: KafkaContiguousBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we would do the same KafkaContiguousBytes
dance here as in KafkaProducerMessage
.
However, the problem is that KafkaHeader
is used in an array, which means the entire array would have to be generic over one specific KafkaContiguousBytes
type which feels wrong too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine to leave this with ByteBuffer
for now. Restricting the Array to single type isn't great.
self._internal[index].vtype = RD_KAFKA_VTYPE_HEADER | ||
|
||
// Copy name C String (strdup) as we do not own the headers: [KafkaHeader] array | ||
guard let headerNameCPointer = header.key.withCString(strdup) else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not really happy with doing the strdup
here. However, we need to somehow need to pass a char *
pointer which we get from a header
entry in the headers: [KafkaHeader]
array (hence accessing the pointer inside of the closure is weird).
I had a couple of ideas to reduce copying here:
- write C glue code that takes the
headers: [KafkaHeader]
array and does essentially whatRDKakfaUnsafeProducerMessage
does, just in a more enclosed way - use copy-on-write / a String pool to avoid unnecessary copies of the String pointer (if a message header name occurs more than once we can avoid the additional copy with our String pool)
let valueCopyBuffer = UnsafeMutableRawPointer.allocate(byteCount: value.readableBytes, alignment: 1) | ||
value.withUnsafeReadableBytes { valueBuffer in | ||
if let baseAddress = valueBuffer.baseAddress { | ||
valueCopyBuffer.copyMemory(from: baseAddress, byteCount: valueBuffer.count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also want to avoid copying here, see comment about strdup
48515b3
to
2f158e3
Compare
if kafkaHeaders.isEmpty { | ||
// Base case: we have read all kafkaHeaders and now invoke the accessor closure | ||
// that can safely access the pointers in cHeaders | ||
return try body(cHeaders) | ||
} else { | ||
guard let kafkaHeader = kafkaHeaders.popLast() else { | ||
fatalError("kafkaHeaders should not be nil") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we swap this around so that we only guard. Then we don't need the fatalError
at all
2f158e3
to
b393742
Compare
Motivation: Be able to attach headers to a `KafkaProducerMessage` and read out headers attached to a `KafkaConsumerMessage`. Modifications: * create new type `struct KafkaHeader` representing a key-value pair of `String` key and `ByteBuffer` value * add property `headers: [KafkaHeader]` to `KafkaConsumerMessage` and `KafkaConsumerMessage` * use `rd_kafka_produceva` (varidadic arguments) to produce messages as `rd_kafka_produce` did not support setting message headers * create helper class `RDKafkaUnsafeProducerMessage` that helps configuring the varidadic argument array for `rd_kafka_produceva` * add new test asserting that both producing and consuming messages with message headers works
Modifications: * no copying of `KafkaProducerMessage` headers and values -> build scoped accessor helper that recursively accesses all underlying pointers of the `KafkaProducerMessage`'s `headers: [KafkaHeader]` * only use `rd_kafka_produceva` when `message.headers.isEmpty == false`
b393742
to
bb403b2
Compare
Motivation:
Be able to attach headers to a
KafkaProducerMessage
and read outheaders attached to a
KafkaConsumerMessage
.Modifications:
struct KafkaHeader
representing a key-value pair ofString
key andByteBuffer
valueheaders: [KafkaHeader]
toKafkaConsumerMessage
andKafkaConsumerMessage
rd_kafka_produceva
(varidadic arguments) to produce messages asrd_kafka_produce
did not support setting message headersRDKafkaUnsafeProducerMessage
that helpsconfiguring the varidadic argument array for
rd_kafka_produceva
message headers works