Skip to content

Timeout idle connections via connections_max_idle_ms #1068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class KafkaClient(object):
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
Expand Down Expand Up @@ -194,6 +195,7 @@ def __init__(self, **configs):
self._wake_r.setblocking(False)
self._wake_lock = threading.Lock()
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False
self._sensors = None
if self.config['metrics']:
Expand Down Expand Up @@ -291,6 +293,8 @@ def _conn_state_change(self, node_id, conn):
if self._sensors:
self._sensors.connection_created.record()

self._idle_expiry_manager.update(node_id)

if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap')
# XXX: make conn.close() require error to cause refresh
Expand All @@ -308,7 +312,13 @@ def _conn_state_change(self, node_id, conn):
pass
if self._sensors:
self._sensors.connection_closed.record()
if self._refresh_on_disconnects and not self._closed:

idle_disconnect = False
if self._idle_expiry_manager.is_expired(node_id):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)

if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()

Expand Down Expand Up @@ -514,10 +524,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
if future and future.is_done:
timeout = 0
else:
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
timeout = min(
timeout_ms,
metadata_timeout_ms,
self._delayed_tasks.next_at() * 1000,
idle_connection_timeout_ms,
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts

Expand Down Expand Up @@ -572,6 +584,8 @@ def _poll(self, timeout, sleep=True):
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
continue

self._idle_expiry_manager.update(conn.node_id)

# Accumulate as many responses as the connection has pending
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
Expand Down Expand Up @@ -601,6 +615,7 @@ def _poll(self, timeout, sleep=True):

if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
self._maybe_close_oldest_connection()
return responses

def in_flight_request_count(self, node_id=None):
Expand Down Expand Up @@ -846,6 +861,14 @@ def _clear_wake_fd(self):
except socket.error:
break

def _maybe_close_oldest_connection(self):
expired_connection = self._idle_expiry_manager.poll_expired_connection()
if expired_connection:
conn_id, ts = expired_connection
idle_ms = (time.time() - ts) * 1000
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
self.close(node_id=conn_id)


class DelayedTaskQueue(object):
# see https://docs.python.org/2/library/heapq.html
Expand Down Expand Up @@ -920,6 +943,76 @@ def pop_ready(self):
return ready_tasks


# OrderedDict requires python2.7+
try:
from collections import OrderedDict
except ImportError:
# If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
OrderedDict = dict


class IdleConnectionManager(object):
def __init__(self, connections_max_idle_ms):
if connections_max_idle_ms > 0:
self.connections_max_idle = connections_max_idle_ms / 1000
else:
self.connections_max_idle = float('inf')
self.next_idle_close_check_time = None
self.update_next_idle_close_check_time(time.time())
self.lru_connections = OrderedDict()

def update(self, conn_id):
# order should reflect last-update
if conn_id in self.lru_connections:
del self.lru_connections[conn_id]
self.lru_connections[conn_id] = time.time()

def remove(self, conn_id):
if conn_id in self.lru_connections:
del self.lru_connections[conn_id]

def is_expired(self, conn_id):
if conn_id not in self.lru_connections:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be False instead of None to be consistent for the return type bool?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose None here to be "falsey" but distinguishable from False since the connection in this case is not "unexpired" but rather "unknown".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can be confusing as the next return is bool type.

In which case that one connection would not be managed by this and None is returned?

return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle

def next_check_ms(self):
now = time.time()
if not self.lru_connections:
return float('inf')
elif self.next_idle_close_check_time <= now:
return 0
else:
return int((self.next_idle_close_check_time - now) * 1000)

def update_next_idle_close_check_time(self, ts):
self.next_idle_close_check_time = ts + self.connections_max_idle

def poll_expired_connection(self):
Copy link
Contributor

@jianbin-wei jianbin-wei Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would return a generator iterator using yield for all expired connections instead of None. In the call to this function, it would iterate through and close them.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could do that, but I chose to stay fairly close to the java client logic just to be safe. I believe this implementation works, and I'm also skeptical that there's much performance optimization to be gained given the relative infrequency with which connection timeouts are likely to occur.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check Java client implementation. If that is the case, I would also try to be close to Java' client behavior. It is not to gain performance but convenient.

if time.time() < self.next_idle_close_check_time:
return None

if not len(self.lru_connections):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant len() check: if not self.lru_connections:

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true

return None

oldest_conn_id = None
oldest_ts = None
if OrderedDict is dict:
for conn_id, ts in self.lru_connections.items():
if oldest_conn_id is None or ts < oldest_ts:
oldest_conn_id = conn_id
oldest_ts = ts
else:
(oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items()))

self.update_next_idle_close_check_time(oldest_ts)

if time.time() >= oldest_ts + self.connections_max_idle:
return (oldest_conn_id, oldest_ts)
else:
return None


class KafkaClientMetrics(object):
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
Expand Down
6 changes: 4 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def __init__(self, host, port, afi, **configs):
if key in configs:
self.config[key] = configs[key]

self.node_id = self.config.pop('node_id')

if self.config['receive_buffer_bytes'] is not None:
self.config['socket_options'].append(
(socket.SOL_SOCKET, socket.SO_RCVBUF,
Expand Down Expand Up @@ -214,7 +216,7 @@ def __init__(self, host, port, afi, **configs):
if self.config['metrics']:
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
self.config['node_id'])
self.node_id)

def connect(self):
"""Attempt to connect and return ConnectionState"""
Expand Down Expand Up @@ -904,7 +906,7 @@ def connect():

def __repr__(self):
return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
self.config['node_id'], self.hostname, self.host, self.port)
self.node_id, self.hostname, self.host, self.port)


class BrokerConnectionMetrics(object):
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class KafkaProducer(object):
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
'connections_max_idle_ms': 600000, # not implemented yet
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove the "not implemented yet" comment? 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a similar comment in the consumer that also needs updating

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- this is fixed up in master

'max_block_ms': 60000,
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
Expand Down
38 changes: 36 additions & 2 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import absolute_import, division

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
Expand All @@ -10,7 +12,7 @@

import pytest

from kafka.client_async import KafkaClient
from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
Expand Down Expand Up @@ -319,7 +321,10 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')

cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
cli = KafkaClient(request_timeout_ms=9999999,
reconnect_backoff_ms=2222,
connections_max_idle_ms=float('inf'),
api_version=(0, 9))

tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
Expand Down Expand Up @@ -395,3 +400,32 @@ def test_schedule():

def test_unschedule():
pass


def test_idle_connection_manager(mocker):
t = mocker.patch.object(time, 'time')
t.return_value = 0

idle = IdleConnectionManager(100)
assert idle.next_check_ms() == float('inf')

idle.update('foo')
assert not idle.is_expired('foo')
assert idle.poll_expired_connection() is None
assert idle.next_check_ms() == 100

t.return_value = 90 / 1000
assert not idle.is_expired('foo')
assert idle.poll_expired_connection() is None
assert idle.next_check_ms() == 10

t.return_value = 100 / 1000
assert idle.is_expired('foo')
assert idle.next_check_ms() == 0

conn_id, conn_ts = idle.poll_expired_connection()
assert conn_id == 'foo'
assert conn_ts == 0

idle.remove('foo')
assert idle.next_check_ms() == float('inf')