Skip to content

Commit f79344c

Browse files
committed
Allow customizing socket timeouts.
Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout.
1 parent a306687 commit f79344c

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

kafka/client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ class KafkaClient(object):
1919
CLIENT_ID = "kafka-python"
2020
ID_GEN = count()
2121

22-
def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
22+
def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10):
2323
# We need one connection to bootstrap
2424
self.bufsize = bufsize
2525
self.client_id = client_id
26+
self.timeout = timeout
2627
self.conns = { # (host, port) -> KafkaConnection
27-
(host, port): KafkaConnection(host, port, bufsize)
28+
(host, port): KafkaConnection(host, port, bufsize, timeout=timeout)
2829
}
2930
self.brokers = {} # broker_id -> BrokerMetadata
3031
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -41,7 +42,7 @@ def _get_conn_for_broker(self, broker):
4142
"""
4243
if (broker.host, broker.port) not in self.conns:
4344
self.conns[(broker.host, broker.port)] = \
44-
KafkaConnection(broker.host, broker.port, self.bufsize)
45+
KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout)
4546

4647
return self.conns[(broker.host, broker.port)]
4748

kafka/conn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ class KafkaConnection(local):
1919
we can do something in here to facilitate multiplexed requests/responses
2020
since the Kafka API includes a correlation id.
2121
"""
22-
def __init__(self, host, port, bufsize=4096):
22+
def __init__(self, host, port, bufsize=4098, timeout=10):
2323
super(KafkaConnection, self).__init__()
2424
self.host = host
2525
self.port = port
2626
self.bufsize = bufsize
2727
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2828
self._sock.connect((host, port))
29-
self._sock.settimeout(10)
29+
self._sock.settimeout(timeout)
3030
self._dirty = False
3131

3232
def __str__(self):

0 commit comments

Comments
 (0)