Skip to content

Commit 3cc4e46

Browse files
committed
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented.
1 parent 604c588 commit 3cc4e46

File tree

3 files changed

+30
-35
lines changed

3 files changed

+30
-35
lines changed

kafka/client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ class KafkaClient(object):
1919
CLIENT_ID = "kafka-python"
2020
ID_GEN = count()
2121

22-
def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10):
22+
def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
2323
# We need one connection to bootstrap
24-
self.bufsize = bufsize
2524
self.client_id = client_id
2625
self.timeout = timeout
2726
self.conns = { # (host, port) -> KafkaConnection
28-
(host, port): KafkaConnection(host, port, bufsize, timeout=timeout)
27+
(host, port): KafkaConnection(host, port, timeout=timeout)
2928
}
3029
self.brokers = {} # broker_id -> BrokerMetadata
3130
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -42,7 +41,7 @@ def _get_conn_for_broker(self, broker):
4241
"""
4342
if (broker.host, broker.port) not in self.conns:
4443
self.conns[(broker.host, broker.port)] = \
45-
KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout)
44+
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
4645

4746
return self.conns[(broker.host, broker.port)]
4847

kafka/conn.py

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ class KafkaConnection(local):
1919
we can do something in here to facilitate multiplexed requests/responses
2020
since the Kafka API includes a correlation id.
2121
"""
22-
def __init__(self, host, port, bufsize=4098, timeout=10):
22+
def __init__(self, host, port, timeout=10):
2323
super(KafkaConnection, self).__init__()
2424
self.host = host
2525
self.port = port
26-
self.bufsize = bufsize
2726
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2827
self._sock.connect((host, port))
2928
self._sock.settimeout(timeout)
@@ -36,41 +35,35 @@ def __str__(self):
3635
# Private API #
3736
###################
3837

39-
def _consume_response(self):
40-
"""
41-
Fully consumer the response iterator
42-
"""
43-
data = ""
44-
for chunk in self._consume_response_iter():
45-
data += chunk
46-
return data
38+
def _read_bytes(self, num_bytes):
39+
bytes_left = num_bytes
40+
resp = ''
41+
log.debug("About to read %d bytes from Kafka", num_bytes)
42+
43+
while bytes_left:
44+
data = self._sock.recv(bytes_left)
45+
if data == '':
46+
raise BufferUnderflowError("Not enough data to read this response")
47+
bytes_left -= len(data)
48+
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
49+
resp += data
50+
51+
return resp
4752

48-
def _consume_response_iter(self):
53+
def _consume_response(self):
4954
"""
5055
This method handles the response header and error messages. It
51-
then returns an iterator for the chunks of the response
56+
then returns the response
5257
"""
53-
log.debug("Handling response from Kafka")
54-
58+
log.debug("Expecting response from Kafka")
5559
# Read the size off of the header
56-
resp = self._sock.recv(4)
57-
if resp == "":
58-
self._raise_connection_error()
59-
(size,) = struct.unpack('>i', resp)
60+
resp = self._read_bytes(4)
6061

61-
log.debug("About to read %d bytes from Kafka", size)
62+
(size,) = struct.unpack('>i', resp)
6263

6364
# Read the remainder of the response
64-
total = 0
65-
while total < size:
66-
resp = self._sock.recv(self.bufsize)
67-
log.debug("Read %d bytes from Kafka", len(resp))
68-
if resp == "":
69-
raise BufferUnderflowError(
70-
"Not enough data to read this response")
71-
72-
total += len(resp)
73-
yield resp
65+
resp = self._read_bytes(size)
66+
return str(resp)
7467

7568
def _raise_connection_error(self):
7669
self._dirty = True

kafka/consumer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
2323
FETCH_MAX_WAIT_TIME = 100
2424
FETCH_MIN_BYTES = 4096
25+
FETCH_BUFFER_SIZE_BYTES = 4096
2526

2627

2728
class FetchContext(object):
@@ -216,8 +217,10 @@ class SimpleConsumer(Consumer):
216217
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
217218
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
218219
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
219-
fetch_size_bytes=FETCH_MIN_BYTES):
220+
fetch_size_bytes=FETCH_MIN_BYTES,
221+
buffer_size=FETCH_BUFFER_SIZE_BYTES):
220222

223+
self.buffer_size = buffer_size
221224
self.partition_info = False # Do not return partition info in msgs
222225
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
223226
self.fetch_min_bytes = fetch_size_bytes
@@ -364,7 +367,7 @@ def __iter_partition__(self, partition, offset):
364367
# use MaxBytes = client's bufsize since we're only
365368
# fetching one topic + partition
366369
req = FetchRequest(
367-
self.topic, partition, offset, self.client.bufsize)
370+
self.topic, partition, offset, self.buffer_size)
368371

369372
(resp,) = self.client.send_fetch_request(
370373
[req],

0 commit comments

Comments
 (0)