@@ -14,7 +14,7 @@ public final class KafkaTransactionalProducer: Service, Sendable {
14
14
///
15
15
/// This creates a producer without listening for events.
16
16
///
17
- /// - Parameter config: The ``KafkaProducerConfiguration `` for configuring the ``KafkaProducer``.
17
+ /// - Parameter config: The ``KafkaTransactionalProducerConfiguration `` for configuring the ``KafkaProducer``.
18
18
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
19
19
/// - Parameter logger: A logger.
20
20
/// - Returns: The newly created ``KafkaProducer``.
@@ -57,15 +57,18 @@ public final class KafkaTransactionalProducer: Service, Sendable {
57
57
return ( transactionalProducer, events)
58
58
}
59
59
60
- //
61
- public func withTransaction( _ body: @Sendable ( KafkaTransaction) async throws -> Void ) async throws {
60
+ /// Begins Kafka transaction, provides it to given closure
61
+ /// Commits transaction unless closure throws
62
+ /// - Parameters:
63
+ /// - function: revieve KafkaTransaction and use fills it with produced messages and offsets
64
+ public func withTransaction( _ function: @Sendable ( KafkaTransaction) async throws -> Void ) async throws {
62
65
let transaction = try KafkaTransaction (
63
66
client: try producer. client ( ) ,
64
67
producer: self . producer
65
68
)
66
69
67
70
do { // need to think here a little bit how to abort transaction
68
- try await body ( transaction)
71
+ try await function ( transaction)
69
72
try await transaction. commit ( )
70
73
} catch { // FIXME: maybe catch AbortTransaction?
71
74
do {
0 commit comments