Skip to content

Commit 87c7f9d

Browse files
committed
Merge pull request #88 from rdiomar/rdiomar_changes
Various changes/fixes, including: * Allow customizing socket timeouts * Read the correct number of bytes from kafka * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn * SimpleConsumer flow changes * Fix some error handling * Add optional upper limit to consumer fetch buffer size * Add and fix unit and integration tests
2 parents 354fcdb + a0c7141 commit 87c7f9d

File tree

8 files changed

+613
-379
lines changed

8 files changed

+613
-379
lines changed

kafka/client.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
from functools import partial
44
from itertools import count
55
import logging
6-
import socket
76
import time
87

9-
from kafka.common import ErrorMapping, TopicAndPartition
10-
from kafka.common import ConnectionError, FailedPayloadsException
8+
from kafka.common import (
9+
ErrorMapping, TopicAndPartition, ConnectionError,
10+
FailedPayloadsException
11+
)
1112
from kafka.conn import KafkaConnection
1213
from kafka.protocol import KafkaProtocol
1314

@@ -19,12 +20,12 @@ class KafkaClient(object):
1920
CLIENT_ID = "kafka-python"
2021
ID_GEN = count()
2122

22-
def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
23+
def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
2324
# We need one connection to bootstrap
24-
self.bufsize = bufsize
2525
self.client_id = client_id
26+
self.timeout = timeout
2627
self.conns = { # (host, port) -> KafkaConnection
27-
(host, port): KafkaConnection(host, port, bufsize)
28+
(host, port): KafkaConnection(host, port, timeout=timeout)
2829
}
2930
self.brokers = {} # broker_id -> BrokerMetadata
3031
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -41,7 +42,7 @@ def _get_conn_for_broker(self, broker):
4142
"""
4243
if (broker.host, broker.port) not in self.conns:
4344
self.conns[(broker.host, broker.port)] = \
44-
KafkaConnection(broker.host, broker.port, self.bufsize)
45+
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
4546

4647
return self.conns[(broker.host, broker.port)]
4748

@@ -165,14 +166,24 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
165166
request = encoder_fn(client_id=self.client_id,
166167
correlation_id=requestId, payloads=payloads)
167168

169+
failed = False
168170
# Send the request, recv the response
169171
try:
170172
conn.send(requestId, request)
171173
if decoder_fn is None:
172174
continue
173-
response = conn.recv(requestId)
174-
except ConnectionError, e: # ignore BufferUnderflow for now
175-
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
175+
try:
176+
response = conn.recv(requestId)
177+
except ConnectionError, e:
178+
log.warning("Could not receive response to request [%s] "
179+
"from server %s: %s", request, conn, e)
180+
failed = True
181+
except ConnectionError, e:
182+
log.warning("Could not send request [%s] to server %s: %s",
183+
request, conn, e)
184+
failed = True
185+
186+
if failed:
176187
failed_payloads += payloads
177188
self.topics_to_brokers = {} # reset metadata
178189
continue

kafka/conn.py

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import struct
55
from threading import local
66

7-
from kafka.common import BufferUnderflowError
87
from kafka.common import ConnectionError
98

109
log = logging.getLogger("kafka")
@@ -19,14 +18,14 @@ class KafkaConnection(local):
1918
we can do something in here to facilitate multiplexed requests/responses
2019
since the Kafka API includes a correlation id.
2120
"""
22-
def __init__(self, host, port, bufsize=4096):
21+
def __init__(self, host, port, timeout=10):
2322
super(KafkaConnection, self).__init__()
2423
self.host = host
2524
self.port = port
26-
self.bufsize = bufsize
2725
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2826
self._sock.connect((host, port))
29-
self._sock.settimeout(10)
27+
self.timeout = timeout
28+
self._sock.settimeout(self.timeout)
3029
self._dirty = False
3130

3231
def __str__(self):
@@ -36,44 +35,31 @@ def __str__(self):
3635
# Private API #
3736
###################
3837

39-
def _consume_response(self):
40-
"""
41-
Fully consume the response iterator
42-
"""
43-
return "".join(self._consume_response_iter())
44-
45-
def _consume_response_iter(self):
46-
"""
47-
This method handles the response header and error messages. It
48-
then returns an iterator for the chunks of the response
49-
"""
50-
log.debug("Handling response from Kafka")
51-
52-
# Read the size off of the header
53-
resp = self._sock.recv(4)
54-
if resp == "":
55-
self._raise_connection_error()
56-
(size,) = struct.unpack('>i', resp)
57-
58-
messagesize = size - 4
59-
log.debug("About to read %d bytes from Kafka", messagesize)
60-
61-
# Read the remainder of the response
62-
total = 0
63-
while total < messagesize:
64-
resp = self._sock.recv(self.bufsize)
65-
log.debug("Read %d bytes from Kafka", len(resp))
66-
if resp == "":
67-
raise BufferUnderflowError(
68-
"Not enough data to read this response")
69-
70-
total += len(resp)
71-
yield resp
72-
7338
def _raise_connection_error(self):
7439
self._dirty = True
7540
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
7641

42+
def _read_bytes(self, num_bytes):
43+
bytes_left = num_bytes
44+
resp = ''
45+
log.debug("About to read %d bytes from Kafka", num_bytes)
46+
if self._dirty:
47+
self.reinit()
48+
while bytes_left:
49+
try:
50+
data = self._sock.recv(bytes_left)
51+
except socket.error:
52+
log.exception('Unable to receive data from Kafka')
53+
self._raise_connection_error()
54+
if data == '':
55+
log.error("Not enough data to read this response")
56+
self._raise_connection_error()
57+
bytes_left -= len(data)
58+
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
59+
resp += data
60+
61+
return resp
62+
7763
##################
7864
# Public API #
7965
##################
@@ -89,7 +75,7 @@ def send(self, request_id, payload):
8975
sent = self._sock.sendall(payload)
9076
if sent is not None:
9177
self._raise_connection_error()
92-
except socket.error:
78+
except socket.error, e:
9379
log.exception('Unable to send payload to Kafka')
9480
self._raise_connection_error()
9581

@@ -98,8 +84,14 @@ def recv(self, request_id):
9884
Get a response from Kafka
9985
"""
10086
log.debug("Reading response %d from Kafka" % request_id)
101-
self.data = self._consume_response()
102-
return self.data
87+
# Read the size off of the header
88+
resp = self._read_bytes(4)
89+
90+
(size,) = struct.unpack('>i', resp)
91+
92+
# Read the remainder of the response
93+
resp = self._read_bytes(size)
94+
return str(resp)
10395

10496
def copy(self):
10597
"""
@@ -124,5 +116,5 @@ def reinit(self):
124116
self.close()
125117
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
126118
self._sock.connect((self.host, self.port))
127-
self._sock.settimeout(10)
119+
self._sock.settimeout(self.timeout)
128120
self._dirty = False

0 commit comments

Comments
 (0)