@@ -171,7 +171,7 @@ class BrokerConnection(object):
171
171
Default: None
172
172
api_version_auto_timeout_ms (int): number of milliseconds to throw a
173
173
timeout exception from the constructor when checking the broker
174
- api version. Only applies if api_version is None
174
+ api version. Only applies if api_version is None. Default: 2000.
175
175
selector (selectors.BaseSelector): Provide a specific selector
176
176
implementation to use for I/O multiplexing.
177
177
Default: selectors.DefaultSelector
@@ -217,6 +217,7 @@ class BrokerConnection(object):
217
217
'ssl_password' : None ,
218
218
'ssl_ciphers' : None ,
219
219
'api_version' : None ,
220
+ 'api_version_auto_timeout_ms' : 2000 ,
220
221
'selector' : selectors .DefaultSelector ,
221
222
'state_change_callback' : lambda node_id , sock , conn : True ,
222
223
'metrics' : None ,
@@ -548,14 +549,14 @@ def _try_api_versions_check(self):
548
549
# ((0, 10), ApiVersionsRequest[0]()),
549
550
request = ApiVersionsRequest [0 ]()
550
551
future = Future ()
551
- response = self ._send (request , blocking = True )
552
+ response = self ._send (request , blocking = True , request_timeout_ms = ( self . config [ 'api_version_auto_timeout_ms' ] * 0.8 ) )
552
553
response .add_callback (self ._handle_api_versions_response , future )
553
554
response .add_errback (self ._handle_api_versions_failure , future )
554
555
self ._api_versions_future = future
555
556
elif self ._check_version_idx < len (self .VERSION_CHECKS ):
556
557
version , request = self .VERSION_CHECKS [self ._check_version_idx ]
557
558
future = Future ()
558
- response = self ._send (request , blocking = True )
559
+ response = self ._send (request , blocking = True , request_timeout_ms = ( self . config [ 'api_version_auto_timeout_ms' ] * 0.8 ) )
559
560
response .add_callback (self ._handle_check_version_response , future , version )
560
561
response .add_errback (self ._handle_check_version_failure , future )
561
562
self ._api_versions_future = future
0 commit comments