-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Timeout idle connections via connections_max_idle_ms #1068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,6 +135,7 @@ class KafkaClient(object): | |
'bootstrap_servers': 'localhost', | ||
'client_id': 'kafka-python-' + __version__, | ||
'request_timeout_ms': 40000, | ||
'connections_max_idle_ms': 9 * 60 * 1000, | ||
'reconnect_backoff_ms': 50, | ||
'max_in_flight_requests_per_connection': 5, | ||
'receive_buffer_bytes': None, | ||
|
@@ -194,6 +195,7 @@ def __init__(self, **configs): | |
self._wake_r.setblocking(False) | ||
self._wake_lock = threading.Lock() | ||
self._selector.register(self._wake_r, selectors.EVENT_READ) | ||
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) | ||
self._closed = False | ||
self._sensors = None | ||
if self.config['metrics']: | ||
|
@@ -291,6 +293,8 @@ def _conn_state_change(self, node_id, conn): | |
if self._sensors: | ||
self._sensors.connection_created.record() | ||
|
||
self._idle_expiry_manager.update(node_id) | ||
|
||
if 'bootstrap' in self._conns and node_id != 'bootstrap': | ||
bootstrap = self._conns.pop('bootstrap') | ||
# XXX: make conn.close() require error to cause refresh | ||
|
@@ -308,7 +312,13 @@ def _conn_state_change(self, node_id, conn): | |
pass | ||
if self._sensors: | ||
self._sensors.connection_closed.record() | ||
if self._refresh_on_disconnects and not self._closed: | ||
|
||
idle_disconnect = False | ||
if self._idle_expiry_manager.is_expired(node_id): | ||
idle_disconnect = True | ||
self._idle_expiry_manager.remove(node_id) | ||
|
||
if self._refresh_on_disconnects and not self._closed and not idle_disconnect: | ||
log.warning("Node %s connection failed -- refreshing metadata", node_id) | ||
self.cluster.request_update() | ||
|
||
|
@@ -514,10 +524,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): | |
if future and future.is_done: | ||
timeout = 0 | ||
else: | ||
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() | ||
timeout = min( | ||
timeout_ms, | ||
metadata_timeout_ms, | ||
self._delayed_tasks.next_at() * 1000, | ||
idle_connection_timeout_ms, | ||
self.config['request_timeout_ms']) | ||
timeout = max(0, timeout / 1000.0) # avoid negative timeouts | ||
|
||
|
@@ -572,6 +584,8 @@ def _poll(self, timeout, sleep=True): | |
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests')) | ||
continue | ||
|
||
self._idle_expiry_manager.update(conn.node_id) | ||
|
||
# Accumulate as many responses as the connection has pending | ||
while conn.in_flight_requests: | ||
response = conn.recv() # Note: conn.recv runs callbacks / errbacks | ||
|
@@ -601,6 +615,7 @@ def _poll(self, timeout, sleep=True): | |
|
||
if self._sensors: | ||
self._sensors.io_time.record((time.time() - end_select) * 1000000000) | ||
self._maybe_close_oldest_connection() | ||
return responses | ||
|
||
def in_flight_request_count(self, node_id=None): | ||
|
@@ -846,6 +861,14 @@ def _clear_wake_fd(self): | |
except socket.error: | ||
break | ||
|
||
def _maybe_close_oldest_connection(self): | ||
expired_connection = self._idle_expiry_manager.poll_expired_connection() | ||
if expired_connection: | ||
conn_id, ts = expired_connection | ||
idle_ms = (time.time() - ts) * 1000 | ||
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) | ||
self.close(node_id=conn_id) | ||
|
||
|
||
class DelayedTaskQueue(object): | ||
# see https://docs.python.org/2/library/heapq.html | ||
|
@@ -920,6 +943,76 @@ def pop_ready(self): | |
return ready_tasks | ||
|
||
|
||
# OrderedDict requires python2.7+ | ||
try: | ||
from collections import OrderedDict | ||
except ImportError: | ||
# If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads | ||
OrderedDict = dict | ||
|
||
|
||
class IdleConnectionManager(object): | ||
def __init__(self, connections_max_idle_ms): | ||
if connections_max_idle_ms > 0: | ||
self.connections_max_idle = connections_max_idle_ms / 1000 | ||
else: | ||
self.connections_max_idle = float('inf') | ||
self.next_idle_close_check_time = None | ||
self.update_next_idle_close_check_time(time.time()) | ||
self.lru_connections = OrderedDict() | ||
|
||
def update(self, conn_id): | ||
# order should reflect last-update | ||
if conn_id in self.lru_connections: | ||
del self.lru_connections[conn_id] | ||
self.lru_connections[conn_id] = time.time() | ||
|
||
def remove(self, conn_id): | ||
if conn_id in self.lru_connections: | ||
del self.lru_connections[conn_id] | ||
|
||
def is_expired(self, conn_id): | ||
if conn_id not in self.lru_connections: | ||
return None | ||
return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle | ||
|
||
def next_check_ms(self): | ||
now = time.time() | ||
if not self.lru_connections: | ||
return float('inf') | ||
elif self.next_idle_close_check_time <= now: | ||
return 0 | ||
else: | ||
return int((self.next_idle_close_check_time - now) * 1000) | ||
|
||
def update_next_idle_close_check_time(self, ts): | ||
self.next_idle_close_check_time = ts + self.connections_max_idle | ||
|
||
def poll_expired_connection(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I would return a generator iterator using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could do that, but I chose to stay fairly close to the java client logic just to be safe. I believe this implementation works, and I'm also skeptical that there's much performance optimization to be gained given the relative infrequency with which connection timeouts are likely to occur. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't check Java client implementation. If that is the case, I would also try to be close to Java' client behavior. It is not to gain performance but convenient. |
||
if time.time() < self.next_idle_close_check_time: | ||
return None | ||
|
||
if not len(self.lru_connections): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. redundant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true |
||
return None | ||
|
||
oldest_conn_id = None | ||
oldest_ts = None | ||
if OrderedDict is dict: | ||
for conn_id, ts in self.lru_connections.items(): | ||
if oldest_conn_id is None or ts < oldest_ts: | ||
oldest_conn_id = conn_id | ||
oldest_ts = ts | ||
else: | ||
(oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items())) | ||
|
||
self.update_next_idle_close_check_time(oldest_ts) | ||
|
||
if time.time() >= oldest_ts + self.connections_max_idle: | ||
return (oldest_conn_id, oldest_ts) | ||
else: | ||
return None | ||
|
||
|
||
class KafkaClientMetrics(object): | ||
def __init__(self, metrics, metric_group_prefix, conns): | ||
self.metrics = metrics | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -266,7 +266,7 @@ class KafkaProducer(object): | |
'linger_ms': 0, | ||
'partitioner': DefaultPartitioner(), | ||
'buffer_memory': 33554432, | ||
'connections_max_idle_ms': 600000, # not implemented yet | ||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should remove the "not implemented yet" comment? 😉 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's a similar comment in the consumer that also needs updating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes -- this is fixed up in master |
||
'max_block_ms': 60000, | ||
'max_request_size': 1048576, | ||
'metadata_max_age_ms': 300000, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be False instead of None to be consistent for the return type
bool
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose None here to be "falsey" but distinguishable from False since the connection in this case is not "unexpired" but rather "unknown".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That can be confusing as the next return is bool type.
In which case that one connection would not be managed by this and None is returned?