Skip to content

Commit a87f922

Browse files
authored
KIP-98: Add offsets support to transactional KafkaProducer (#2590)
1 parent 315f9d8 commit a87f922

File tree

4 files changed

+324
-18
lines changed

4 files changed

+324
-18
lines changed

kafka/producer/kafka.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,36 @@ def begin_transaction(self):
686686
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
687687
self._transaction_manager.begin_transaction()
688688

689+
def send_offsets_to_transaction(self, offsets, consumer_group_id):
690+
"""
691+
Sends a list of consumed offsets to the consumer group coordinator, and also marks
692+
those offsets as part of the current transaction. These offsets will be considered
693+
consumed only if the transaction is committed successfully.
694+
695+
This method should be used when you need to batch consumed and produced messages
696+
together, typically in a consume-transform-produce pattern.
697+
698+
Arguments:
699+
offsets ({TopicPartition: OffsetAndMetadata}): map of topic-partition -> offsets to commit
700+
as part of current transaction.
701+
consumer_group_id (str): Name of consumer group for offsets commit.
702+
703+
Raises:
704+
IllegalStateError: if no transactional_id, or transaction has not been started.
705+
ProducerFencedError: fatal error indicating another producer with the same transactional_id is active.
706+
UnsupportedVersionError: fatal error indicating the broker does not support transactions (i.e. if < 0.11).
707+
UnsupportedForMessageFormatError: fatal error indicating the message format used for the offsets
708+
topic on the broker does not support transactions.
709+
AuthorizationError: fatal error indicating that the configured transactional_id is not authorized.
710+
KafkaErro:r if the producer has encountered a previous fatal or abortable error, or for any
711+
other unexpected error
712+
"""
713+
if not self._transaction_manager:
714+
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
715+
result = self._transaction_manager.send_offsets_to_transaction(offsets, consumer_group_id)
716+
self._sender.wakeup()
717+
result.wait()
718+
689719
def commit_transaction(self):
690720
""" Commits the ongoing transaction.
691721

kafka/producer/transaction_manager.py

Lines changed: 182 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
from kafka.vendor.enum34 import IntEnum
1717

1818
import kafka.errors as Errors
19+
from kafka.protocol.add_offsets_to_txn import AddOffsetsToTxnRequest
1920
from kafka.protocol.add_partitions_to_txn import AddPartitionsToTxnRequest
2021
from kafka.protocol.end_txn import EndTxnRequest
2122
from kafka.protocol.find_coordinator import FindCoordinatorRequest
2223
from kafka.protocol.init_producer_id import InitProducerIdRequest
24+
from kafka.protocol.txn_offset_commit import TxnOffsetCommitRequest
2325
from kafka.structs import TopicPartition
2426

2527

@@ -115,6 +117,7 @@ def __init__(self, transactional_id=None, transaction_timeout_ms=0, retry_backof
115117
self._new_partitions_in_transaction = set()
116118
self._pending_partitions_in_transaction = set()
117119
self._partitions_in_transaction = set()
120+
self._pending_txn_offset_commits = dict()
118121

119122
self._current_state = TransactionState.UNINITIALIZED
120123
self._last_error = None
@@ -138,7 +141,7 @@ def initialize_transactions(self):
138141
self._transition_to(TransactionState.INITIALIZING)
139142
self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH))
140143
self._sequence_numbers.clear()
141-
handler = InitProducerIdHandler(self, self.transactional_id, self.transaction_timeout_ms)
144+
handler = InitProducerIdHandler(self, self.transaction_timeout_ms)
142145
self._enqueue_request(handler)
143146
return handler.result
144147

@@ -169,10 +172,22 @@ def begin_abort(self):
169172
def _begin_completing_transaction(self, committed):
170173
if self._new_partitions_in_transaction:
171174
self._enqueue_request(self._add_partitions_to_transaction_handler())
172-
handler = EndTxnHandler(self, self.transactional_id, self.producer_id_and_epoch.producer_id, self.producer_id_and_epoch.epoch, committed)
175+
handler = EndTxnHandler(self, committed)
173176
self._enqueue_request(handler)
174177
return handler.result
175178

179+
def send_offsets_to_transaction(self, offsets, consumer_group_id):
180+
with self._lock:
181+
self._ensure_transactional()
182+
self._maybe_fail_with_error()
183+
if self._current_state != TransactionState.IN_TRANSACTION:
184+
raise Errors.KafkaError("Cannot send offsets to transaction because the producer is not in an active transaction")
185+
186+
log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, consumer_group_id)
187+
handler = AddOffsetsToTxnHandler(self, consumer_group_id, offsets)
188+
self._enqueue_request(handler)
189+
return handler.result
190+
176191
def maybe_add_partition_to_transaction(self, topic_partition):
177192
with self._lock:
178193
self._fail_if_not_ready_for_send()
@@ -389,6 +404,10 @@ def _test_transaction_contains_partition(self, tp):
389404
with self._lock:
390405
return tp in self._partitions_in_transaction
391406

407+
# visible for testing
408+
def _test_has_pending_offset_commits(self):
409+
return bool(self._pending_txn_offset_commits)
410+
392411
# visible for testing
393412
def _test_has_ongoing_transaction(self):
394413
with self._lock:
@@ -473,7 +492,7 @@ def _add_partitions_to_transaction_handler(self):
473492
with self._lock:
474493
self._pending_partitions_in_transaction.update(self._new_partitions_in_transaction)
475494
self._new_partitions_in_transaction.clear()
476-
return AddPartitionsToTxnHandler(self, self.transactional_id, self.producer_id_and_epoch.producer_id, self.producer_id_and_epoch.epoch, self._pending_partitions_in_transaction)
495+
return AddPartitionsToTxnHandler(self, self._pending_partitions_in_transaction)
477496

478497

479498
class TransactionalRequestResult(object):
@@ -518,6 +537,18 @@ def __init__(self, transaction_manager, result=None):
518537
self._result = result or TransactionalRequestResult()
519538
self._is_retry = False
520539

540+
@property
541+
def transactional_id(self):
542+
return self.transaction_manager.transactional_id
543+
544+
@property
545+
def producer_id(self):
546+
return self.transaction_manager.producer_id_and_epoch.producer_id
547+
548+
@property
549+
def producer_epoch(self):
550+
return self.transaction_manager.producer_id_and_epoch.epoch
551+
521552
def fatal_error(self, exc):
522553
self.transaction_manager._transition_to_fatal_error(exc)
523554
self._result.done(error=exc)
@@ -585,16 +616,15 @@ def priority(self):
585616

586617

587618
class InitProducerIdHandler(TxnRequestHandler):
588-
def __init__(self, transaction_manager, transactional_id, transaction_timeout_ms):
619+
def __init__(self, transaction_manager, transaction_timeout_ms):
589620
super(InitProducerIdHandler, self).__init__(transaction_manager)
590621

591-
self.transactional_id = transactional_id
592622
if transaction_manager._api_version >= (2, 0):
593623
version = 1
594624
else:
595625
version = 0
596626
self.request = InitProducerIdRequest[version](
597-
transactional_id=transactional_id,
627+
transactional_id=self.transactional_id,
598628
transaction_timeout_ms=transaction_timeout_ms)
599629

600630
@property
@@ -619,10 +649,9 @@ def handle_response(self, response):
619649
self.fatal_error(Errors.KafkaError("Unexpected error in InitProducerIdResponse: %s" % (error())))
620650

621651
class AddPartitionsToTxnHandler(TxnRequestHandler):
622-
def __init__(self, transaction_manager, transactional_id, producer_id, producer_epoch, topic_partitions):
652+
def __init__(self, transaction_manager, topic_partitions):
623653
super(AddPartitionsToTxnHandler, self).__init__(transaction_manager)
624654

625-
self.transactional_id = transactional_id
626655
if transaction_manager._api_version >= (2, 7):
627656
version = 2
628657
elif transaction_manager._api_version >= (2, 0):
@@ -633,9 +662,9 @@ def __init__(self, transaction_manager, transactional_id, producer_id, producer_
633662
for tp in topic_partitions:
634663
topic_data[tp.topic].append(tp.partition)
635664
self.request = AddPartitionsToTxnRequest[version](
636-
transactional_id=transactional_id,
637-
producer_id=producer_id,
638-
producer_epoch=producer_epoch,
665+
transactional_id=self.transactional_id,
666+
producer_id=self.producer_id,
667+
producer_epoch=self.producer_epoch,
639668
topics=list(topic_data.items()))
640669

641670
@property
@@ -771,20 +800,19 @@ def handle_response(self, response):
771800

772801

773802
class EndTxnHandler(TxnRequestHandler):
774-
def __init__(self, transaction_manager, transactional_id, producer_id, producer_epoch, committed):
803+
def __init__(self, transaction_manager, committed):
775804
super(EndTxnHandler, self).__init__(transaction_manager)
776805

777-
self.transactional_id = transactional_id
778806
if self.transaction_manager._api_version >= (2, 7):
779807
version = 2
780808
elif self.transaction_manager._api_version >= (2, 0):
781809
version = 1
782810
else:
783811
version = 0
784812
self.request = EndTxnRequest[version](
785-
transactional_id=transactional_id,
786-
producer_id=producer_id,
787-
producer_epoch=producer_epoch,
813+
transactional_id=self.transactional_id,
814+
producer_id=self.producer_id,
815+
producer_epoch=self.producer_epoch,
788816
committed=committed)
789817

790818
@property
@@ -810,3 +838,141 @@ def handle_response(self, response):
810838
self.fatal_error(error())
811839
else:
812840
self.fatal_error(Errors.KafkaError("Unhandled error in EndTxnResponse: %s" % (error())))
841+
842+
843+
class AddOffsetsToTxnHandler(TxnRequestHandler):
844+
def __init__(self, transaction_manager, consumer_group_id, offsets):
845+
super(AddOffsetsToTxnHandler, self).__init__(transaction_manager)
846+
847+
self.consumer_group_id = consumer_group_id
848+
self.offsets = offsets
849+
if self.transaction_manager._api_version >= (2, 7):
850+
version = 2
851+
elif self.transaction_manager._api_version >= (2, 0):
852+
version = 1
853+
else:
854+
version = 0
855+
self.request = AddOffsetsToTxnRequest[version](
856+
transactional_id=self.transactional_id,
857+
producer_id=self.producer_id,
858+
producer_epoch=self.producer_epoch,
859+
group_id=consumer_group_id)
860+
861+
@property
862+
def priority(self):
863+
return Priority.ADD_PARTITIONS_OR_OFFSETS
864+
865+
def handle_response(self, response):
866+
error = Errors.for_code(response.error_code)
867+
868+
if error is Errors.NoError:
869+
log.debug("Successfully added partition for consumer group %s to transaction", self.consumer_group_id)
870+
871+
# note the result is not completed until the TxnOffsetCommit returns
872+
for tp, offset in six.iteritems(self.offsets):
873+
self.transaction_manager._pending_txn_offset_commits[tp] = offset
874+
handler = TxnOffsetCommitHandler(self.transaction_manager, self.consumer_group_id,
875+
self.transaction_manager._pending_txn_offset_commits, self._result)
876+
self.transaction_manager._enqueue_request(handler)
877+
self.transaction_manager._transaction_started = True
878+
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
879+
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
880+
self.reenqueue()
881+
elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
882+
self.reenqueue()
883+
elif error is Errors.InvalidProducerEpochError:
884+
self.fatal_error(error())
885+
elif error is Errors.TransactionalIdAuthorizationFailedError:
886+
self.fatal_error(error())
887+
elif error is Errors.GroupAuthorizationFailedError:
888+
self.abortable_error(Errors.GroupAuthorizationError(self.consumer_group_id))
889+
else:
890+
self.fatal_error(Errors.KafkaError("Unexpected error in AddOffsetsToTxnResponse: %s" % (error())))
891+
892+
893+
class TxnOffsetCommitHandler(TxnRequestHandler):
894+
def __init__(self, transaction_manager, consumer_group_id, offsets, result):
895+
super(TxnOffsetCommitHandler, self).__init__(transaction_manager, result=result)
896+
897+
self.consumer_group_id = consumer_group_id
898+
self.offsets = offsets
899+
self.request = self._build_request()
900+
901+
def _build_request(self):
902+
if self.transaction_manager._api_version >= (2, 1):
903+
version = 2
904+
elif self.transaction_manager._api_version >= (2, 0):
905+
version = 1
906+
else:
907+
version = 0
908+
909+
topic_data = collections.defaultdict(list)
910+
for tp, offset in six.iteritems(self.offsets):
911+
if version >= 2:
912+
partition_data = (tp.partition, offset.offset, offset.leader_epoch, offset.metadata)
913+
else:
914+
partition_data = (tp.partition, offset.offset, offset.metadata)
915+
topic_data[tp.topic].append(partition_data)
916+
917+
return TxnOffsetCommitRequest[version](
918+
transactional_id=self.transactional_id,
919+
group_id=self.consumer_group_id,
920+
producer_id=self.producer_id,
921+
producer_epoch=self.producer_epoch,
922+
topics=list(topic_data.items()))
923+
924+
@property
925+
def priority(self):
926+
return Priority.ADD_PARTITIONS_OR_OFFSETS
927+
928+
@property
929+
def coordinator_type(self):
930+
return 'group'
931+
932+
@property
933+
def coordinator_key(self):
934+
return self.consumer_group_id
935+
936+
def handle_response(self, response):
937+
lookup_coordinator = False
938+
retriable_failure = False
939+
940+
errors = {TopicPartition(topic, partition): Errors.for_code(error_code)
941+
for topic, partition_data in response.topics
942+
for partition, error_code in partition_data}
943+
944+
for tp, error in six.iteritems(errors):
945+
if error is Errors.NoError:
946+
log.debug("Successfully added offsets for %s from consumer group %s to transaction.",
947+
tp, self.consumer_group_id)
948+
del self.transaction_manager._pending_txn_offset_commits[tp]
949+
elif error in (errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError):
950+
retriable_failure = True
951+
lookup_coordinator = True
952+
elif error is Errors.UnknownTopicOrPartitionError:
953+
retriable_failure = True
954+
elif error is Errors.GroupAuthorizationFailedError:
955+
self.abortable_error(Errors.GroupAuthorizationError(self.consumer_group_id))
956+
return
957+
elif error in (Errors.TransactionalIdAuthorizationFailedError,
958+
Errors.InvalidProducerEpochError,
959+
Errors.UnsupportedForMessageFormatError):
960+
self.fatal_error(error())
961+
return
962+
else:
963+
self.fatal_error(Errors.KafkaError("Unexpected error in TxnOffsetCommitResponse: %s" % (error())))
964+
return
965+
966+
if lookup_coordinator:
967+
self.transaction_manager._lookup_coordinator('group', self.consumer_group_id)
968+
969+
if not retriable_failure:
970+
# all attempted partitions were either successful, or there was a fatal failure.
971+
# either way, we are not retrying, so complete the request.
972+
self.result.done()
973+
974+
# retry the commits which failed with a retriable error.
975+
elif self.transaction_manager._pending_txn_offset_commits:
976+
self.offsets = self.transaction_manager._pending_txn_offset_commits
977+
self.request = self._build_request()
978+
self.reenqueue()

0 commit comments

Comments
 (0)