Skip to content

Commit c5cbe84

Browse files
committed
fixup reset_producer_id is_transactional() check
1 parent a87f922 commit c5cbe84

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

kafka/producer/transaction_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ def is_valid(self):
4747
def match(self, batch):
4848
return self.producer_id == batch.producer_id and self.epoch == batch.producer_epoch
4949

50+
def __eq__(self, other):
51+
return isinstance(other, ProducerIdAndEpoch) and self.producer_id == other.producer_id and self.epoch == other.epoch
52+
5053
def __str__(self):
5154
return "ProducerIdAndEpoch(producer_id={}, epoch={})".format(self.producer_id, self.epoch)
5255

@@ -304,7 +307,7 @@ def reset_producer_id(self):
304307
it's best to return the produce error to the user and let them abort the transaction and close the producer explicitly.
305308
"""
306309
with self._lock:
307-
if self.is_transactional:
310+
if self.is_transactional():
308311
raise Errors.IllegalStateError(
309312
"Cannot reset producer state for a transactional producer."
310313
" You must either abort the ongoing transaction or"

test/test_producer.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import pytest
88

99
from kafka import KafkaProducer
10+
from kafka.cluster import ClusterMetadata
11+
from kafka.producer.transaction_manager import TransactionManager, ProducerIdAndEpoch
12+
1013

1114
@pytest.mark.skipif(platform.python_implementation() != 'CPython',
1215
reason='Test relies on CPython-specific gc policies')
@@ -20,4 +23,17 @@ def test_kafka_producer_gc_cleanup():
2023
assert threading.active_count() == threads
2124

2225

26+
def test_idempotent_producer_reset_producer_id():
27+
transaction_manager = TransactionManager(
28+
transactional_id=None,
29+
transaction_timeout_ms=1000,
30+
retry_backoff_ms=100,
31+
api_version=(0, 11),
32+
metadata=ClusterMetadata(),
33+
)
2334

35+
test_producer_id_and_epoch = ProducerIdAndEpoch(123, 456)
36+
transaction_manager.set_producer_id_and_epoch(test_producer_id_and_epoch)
37+
assert transaction_manager.producer_id_and_epoch == test_producer_id_and_epoch
38+
transaction_manager.reset_producer_id()
39+
assert transaction_manager.producer_id_and_epoch == ProducerIdAndEpoch(-1, -1)

0 commit comments

Comments
 (0)