Skip to content

Commit 7ed8a79

Browse files
dpkp88manpreet
authored andcommitted
Fix fetch_max_bytes=1 consumer integration test
1 parent f092aaa commit 7ed8a79

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

test/test_consumer_integration.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -752,20 +752,20 @@ def test_kafka_consumer_max_bytes_one_msg(self):
752752
self.send_messages(0, range(100, 200))
753753

754754
# Start a consumer. FetchResponse_v3 should always include at least 1
755-
# full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time
755+
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
756+
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
757+
# stored in the new v2 format by the broker.
758+
#
759+
# DP Note: This is a strange test. The consumer shouldn't care
760+
# how many messages are included in a FetchResponse, as long as it is
761+
# non-zero. I would not mind if we deleted this test. It caused
762+
# a minor headache when testing 0.11.0.0.
756763
group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
757764
consumer = self.kafka_consumer(
758765
group_id=group,
759766
auto_offset_reset='earliest',
767+
consumer_timeout_ms=5000,
760768
fetch_max_bytes=1)
761-
fetched_msgs = []
762-
# A bit hacky, but we need this in order for message count to be exact
763-
consumer._coordinator.ensure_active_group()
764-
for i in range(10):
765-
poll_res = consumer.poll(timeout_ms=2000)
766-
print(poll_res)
767-
for partition, msgs in six.iteritems(poll_res):
768-
for msg in msgs:
769-
fetched_msgs.append(msg)
770769

770+
fetched_msgs = [next(consumer) for i in range(10)]
771771
self.assertEqual(len(fetched_msgs), 10)

0 commit comments

Comments
 (0)