Skip to content

Commit 7da4901

Browse files
address some review comments
1 parent 4f9520f commit 7da4901

11 files changed

+354
-95
lines changed

Package.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ let package = Package(
4444
.package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
4545
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
4646
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
47-
.package(url: "https://github.com/ordo-one/package-concurrency-helpers", .upToNextMajor(from: "1.0.0")),
4847
// The zstd Swift package produces warnings that we cannot resolve:
4948
// https://github.com/facebook/zstd/issues/3328
5049
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
@@ -74,7 +73,6 @@ let package = Package(
7473
name: "SwiftKafka",
7574
dependencies: [
7675
"Crdkafka",
77-
.product(name: "ConcurrencyHelpers", package: "package-concurrency-helpers", moduleAliases: ["ConcurrencyHelpers" : "BlockingCallWrapper"]),
7876
.product(name: "NIOCore", package: "swift-nio"),
7977
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
8078
.product(name: "Logging", package: "swift-log"),

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ public struct KafkaProducerConfiguration {
3636
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
3737
/// Default: `false`
3838
public var enableIdempotence: Bool = false
39-
40-
public var transactionalId: String?
4139

4240
/// Producer queue options.
4341
public var queue: KafkaConfiguration.QueueOptions = .init()
@@ -99,6 +97,8 @@ public struct KafkaProducerConfiguration {
9997
/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
10098
/// Default: `.plaintext`
10199
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
100+
101+
internal var transactionalId: String?
102102

103103
public init() {}
104104
}
@@ -112,6 +112,9 @@ extension KafkaProducerConfiguration {
112112
resultDict["enable.idempotence"] = String(self.enableIdempotence)
113113
if let transactionalId {
114114
resultDict["transactional.id"] = transactionalId
115+
resultDict["transaction.timeout.ms"] = "60000"
116+
resultDict["message.timeout.ms"] = "60000"
117+
115118
}
116119
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
117120
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
@@ -191,3 +194,38 @@ extension KafkaConfiguration {
191194
}
192195
}
193196
}
197+
198+
// FIXME: should we really duplicate `KafkaProducerConfiguration`
199+
// FIXME: after public api updated?
200+
public struct KafkaTransactionalProducerConfiguration {
201+
var transactionalId: String
202+
var transactionsTimeout: Duration
203+
204+
var producerConfiguration: KafkaProducerConfiguration {
205+
set {
206+
self.producerConfiguration_ = newValue
207+
}
208+
get {
209+
var conf = self.producerConfiguration_
210+
conf.transactionalId = self.transactionalId
211+
conf.enableIdempotence = true
212+
conf.maxInFlightRequestsPerConnection = min(conf.maxInFlightRequestsPerConnection, 5)
213+
return conf
214+
}
215+
}
216+
217+
private var producerConfiguration_: KafkaProducerConfiguration = .init()
218+
219+
public init(transactionalId: String, transactionsTimeout: Duration = .kafkaUntilEndOfTransactionTimeout, producerConfiguration: KafkaProducerConfiguration = .init()) {
220+
self.transactionalId = transactionalId
221+
self.transactionsTimeout = transactionsTimeout
222+
self.producerConfiguration = producerConfiguration
223+
}
224+
}
225+
// MARK: - KafkaProducerConfiguration + Hashable
226+
227+
extension KafkaTransactionalProducerConfiguration: Hashable {}
228+
229+
// MARK: - KafkaProducerConfiguration + Sendable
230+
231+
extension KafkaTransactionalProducerConfiguration: Sendable {}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 15 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -259,42 +259,10 @@ public final class KafkaProducer: Service, Sendable {
259259
return KafkaProducerMessageID(rawValue: newMessageID)
260260
}
261261
}
262-
263-
func initTransactions(timeout: Duration = .seconds(5)) async throws {
264-
guard config.transactionalId != nil else {
265-
throw KafkaError.config(
266-
reason: "Could not initialize transactions because transactionalId is not set in config")
267-
}
268-
let client = try self.stateMachine.withLockedValue { try $0.initTransactions() }
269-
try await client.initTransactions(timeout: timeout)
270-
}
271-
272-
func beginTransaction() throws {
273-
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
274-
try client.beginTransaction()
275-
}
276-
277-
func send(
278-
offsets: RDKafkaTopicPartitionList,
279-
forConsumer consumer: KafkaConsumer,
280-
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
281-
attempts: UInt64 = .max
282-
) async throws {
283-
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
284-
let consumerClient = try consumer.client()
285-
try await consumerClient.withKafkaHandlePointer {
286-
try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
287-
}
288-
}
289-
290-
func abortTransaction(
291-
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
292-
attempts: UInt64) async throws {
293-
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
294-
try await client.abortTransaction(attempts: attempts, timeout: timeout)
262+
263+
func client() throws -> RDKafkaClient {
264+
try self.stateMachine.withLockedValue { try $0.client() }
295265
}
296-
297-
298266
}
299267

300268
// MARK: - KafkaProducer + StateMachine
@@ -322,18 +290,6 @@ extension KafkaProducer {
322290
source: Producer.Source?,
323291
topicHandles: RDKafkaTopicHandles
324292
)
325-
/// The ``KafkaProducer`` has started and is ready to use, transactions were initialized.
326-
///
327-
/// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
328-
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
329-
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
330-
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
331-
case startedWithTransactions(
332-
client: RDKafkaClient,
333-
messageIDCounter: UInt,
334-
source: Producer.Source?,
335-
topicHandles: RDKafkaTopicHandles
336-
)
337293
/// Producer is still running but the event asynchronous sequence was terminated.
338294
/// All incoming events will be dropped.
339295
///
@@ -401,7 +357,7 @@ extension KafkaProducer {
401357
switch self.state {
402358
case .uninitialized:
403359
fatalError("\(#function) invoked while still in state \(self.state)")
404-
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
360+
case .started(let client, _, let source, _):
405361
return .pollAndYield(client: client, source: source)
406362
case .consumptionStopped(let client):
407363
return .pollWithoutYield(client: client)
@@ -444,19 +400,6 @@ extension KafkaProducer {
444400
newMessageID: newMessageID,
445401
topicHandles: topicHandles
446402
)
447-
case .startedWithTransactions(let client, let messageIDCounter, let source, let topicHandles):
448-
let newMessageID = messageIDCounter + 1
449-
self.state = .startedWithTransactions(
450-
client: client,
451-
messageIDCounter: newMessageID,
452-
source: source,
453-
topicHandles: topicHandles
454-
)
455-
return .send(
456-
client: client,
457-
newMessageID: newMessageID,
458-
topicHandles: topicHandles
459-
)
460403
case .consumptionStopped:
461404
throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed")
462405
case .finishing:
@@ -482,7 +425,7 @@ extension KafkaProducer {
482425
fatalError("\(#function) invoked while still in state \(self.state)")
483426
case .consumptionStopped:
484427
fatalError("messageSequenceTerminated() must not be invoked more than once")
485-
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
428+
case .started(let client, _, let source, _):
486429
self.state = .consumptionStopped(client: client)
487430
return .finishSource(source: source)
488431
case .finishing(let client, let source):
@@ -502,34 +445,28 @@ extension KafkaProducer {
502445
switch self.state {
503446
case .uninitialized:
504447
fatalError("\(#function) invoked while still in state \(self.state)")
505-
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
448+
case .started(let client, _, let source, _):
506449
self.state = .finishing(client: client, source: source)
507450
case .consumptionStopped(let client):
508451
self.state = .finishing(client: client, source: nil)
509452
case .finishing, .finished:
510453
break
511454
}
512455
}
513-
514-
mutating func initTransactions() throws -> RDKafkaClient {
456+
457+
func client() throws -> RDKafkaClient {
515458
switch self.state {
516459
case .uninitialized:
517460
fatalError("\(#function) invoked while still in state \(self.state)")
518-
case .started(let client, let messageIDCounter, let source, let topicHandles):
519-
self.state = .startedWithTransactions(client: client, messageIDCounter: messageIDCounter, source: source, topicHandles: topicHandles)
461+
case .started(let client, _, _, _):
520462
return client
521-
case .startedWithTransactions:
522-
throw KafkaError.config(reason: "Transactions were already initialized")
523-
case .consumptionStopped, .finishing, .finished:
524-
throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
525-
}
526-
}
527-
528-
func transactionsClient() throws -> RDKafkaClient {
529-
guard case let .startedWithTransactions(client, _, _, _) = self.state else {
530-
throw KafkaError.transactionAborted(reason: "Transactions were not initialized or producer is being stopped")
463+
case .consumptionStopped(let client):
464+
return client
465+
case .finishing(let client, _):
466+
return client
467+
case .finished:
468+
throw KafkaError.connectionClosed(reason: "Client stopped")
531469
}
532-
return client
533470
}
534471
}
535472
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
2+
3+
public final class KafkaTransaction {
4+
let client: RDKafkaClient
5+
let producer: KafkaProducer
6+
let config: KafkaTransactionalProducerConfiguration
7+
8+
init(client: RDKafkaClient, producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) throws {
9+
self.client = client
10+
self.producer = producer
11+
self.config = config
12+
13+
try client.beginTransaction()
14+
}
15+
16+
deinit {
17+
}
18+
19+
public func send(
20+
offsets: RDKafkaTopicPartitionList,
21+
forConsumer consumer: KafkaConsumer,
22+
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
23+
attempts: UInt64 = .max
24+
) async throws {
25+
let consumerClient = try consumer.client()
26+
try await consumerClient.withKafkaHandlePointer {
27+
try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
28+
}
29+
}
30+
31+
@discardableResult
32+
public func send<Key, Value>(_ message: KafkaProducerMessage<Key, Value>) throws -> KafkaProducerMessageID {
33+
try self.producer.send(message)
34+
}
35+
36+
func commit() async throws {
37+
try await client.commitTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
38+
}
39+
40+
func abort() async throws {
41+
try await client.abortTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
42+
}
43+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import Logging
2+
import ServiceLifecycle
3+
4+
public final class KafkaTransactionalProducer: Service, Sendable {
5+
let producer: KafkaProducer
6+
let config: KafkaTransactionalProducerConfiguration
7+
8+
private init(producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) async throws {
9+
self.producer = producer
10+
self.config = config
11+
let client = try producer.client()
12+
try await client.initTransactions(timeout: config.transactionsTimeout)
13+
}
14+
15+
/// Initialize a new ``KafkaTransactionalProducer``.
16+
///
17+
/// This creates a producer without listening for events.
18+
///
19+
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
20+
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
21+
/// - Parameter logger: A logger.
22+
/// - Returns: The newly created ``KafkaProducer``.
23+
/// - Throws: A ``KafkaError`` if initializing the producer failed.
24+
public convenience init(
25+
config: KafkaTransactionalProducerConfiguration,
26+
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
27+
logger: Logger
28+
) async throws {
29+
let producer = try KafkaProducer(config: config.producerConfiguration, topicConfig: topicConfig, logger: logger)
30+
try await self.init(producer: producer, config: config)
31+
}
32+
33+
/// Initialize a new ``KafkaTransactionalProducer`` and a ``KafkaProducerEvents`` asynchronous sequence.
34+
///
35+
/// Use the asynchronous sequence to consume events.
36+
///
37+
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
38+
/// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
39+
///
40+
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
41+
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
42+
/// - Parameter logger: A logger.
43+
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaProducerEvents``
44+
/// `AsyncSequence` used for receiving message events.
45+
/// - Throws: A ``KafkaError`` if initializing the producer failed.
46+
public static func makeTransactionalProducerWithEvents(
47+
config: KafkaTransactionalProducerConfiguration,
48+
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
49+
logger: Logger
50+
) async throws -> (KafkaTransactionalProducer, KafkaProducerEvents) {
51+
let (producer, events) = try KafkaProducer.makeProducerWithEvents(config: config.producerConfiguration, topicConfig: topicConfig, logger: logger)
52+
53+
let transactionalProducer = try await KafkaTransactionalProducer(producer: producer, config: config)
54+
55+
return (transactionalProducer, events)
56+
}
57+
58+
//
59+
public func withTransaction(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws {
60+
let transaction = try KafkaTransaction(
61+
client: try producer.client(),
62+
producer: producer,
63+
config: config)
64+
65+
do { // need to think here a little bit how to abort transaction
66+
try await body(transaction)
67+
try await transaction.commit()
68+
} catch { // FIXME: maybe catch AbortTransaction?
69+
do {
70+
try await transaction.abort()
71+
} catch {
72+
// FIXME: that some inconsistent state
73+
// should we emit fatalError(..)
74+
// or propagate error as exception with isFatal flag?
75+
}
76+
throw error
77+
}
78+
}
79+
80+
public func run() async throws {
81+
try await producer.run()
82+
}
83+
}

0 commit comments

Comments
 (0)