Skip to content

Commit c13f1d8

Browse files
committed
Update kafka-src to latest 0.8
Fix a broken test (100k was too much to send in one batch)
1 parent d71af27 commit c13f1d8

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

kafka-src

Submodule kafka-src updated 208 files

kafka/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
116116
for payload in payloads:
117117
payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload)
118118
original_keys.append((payload.topic, payload.partition))
119-
119+
120120
# Accumulate the responses in a dictionary
121121
acc = {}
122122

kafka/conn.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ def _consume_response_iter(self):
7171

7272
def send(self, requestId, payload):
7373
"Send a request to Kafka"
74+
log.debug("About to send %d bytes to Kafka" % len(payload))
7475
sent = self._sock.sendall(payload)
75-
if sent == 0:
76+
if sent != None:
7677
raise RuntimeError("Kafka went away")
7778
self.data = self._consume_response()
7879

test/integration.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,25 @@ def test_produce_mixed(self):
225225

226226

227227
def test_produce_100k_gzipped(self):
228-
produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
229-
create_gzip_message(["Gzipped %d" % i for i in range(100000)])
228+
req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
229+
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
230230
])
231231

232-
for resp in self.client.send_produce_request([produce]):
232+
for resp in self.client.send_produce_request([req1]):
233233
self.assertEquals(resp.error, 0)
234234
self.assertEquals(resp.offset, 0)
235235

236+
(offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)])
237+
self.assertEquals(offset.offsets[0], 50000)
238+
239+
req2 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
240+
create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)])
241+
])
242+
243+
for resp in self.client.send_produce_request([req2]):
244+
self.assertEquals(resp.error, 0)
245+
self.assertEquals(resp.offset, 50000)
246+
236247
(offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)])
237248
self.assertEquals(offset.offsets[0], 100000)
238249

0 commit comments

Comments
 (0)