@@ -579,6 +579,7 @@ def _handle_api_versions_response(self, future, response):
579
579
])
580
580
self ._api_version = self ._infer_broker_version_from_api_versions (self ._api_versions )
581
581
future .success (self ._api_version )
582
+ self .connect ()
582
583
583
584
def _handle_api_versions_failure (self , future , ex ):
584
585
future .failure (ex )
@@ -591,6 +592,7 @@ def _handle_check_version_response(self, future, version, _response):
591
592
self ._api_versions = BROKER_API_VERSIONS [version ]
592
593
self ._api_version = version
593
594
future .success (version )
595
+ self .connect ()
594
596
595
597
def _handle_check_version_failure (self , future , ex ):
596
598
future .failure (ex )
@@ -628,23 +630,28 @@ def _handle_sasl_handshake_response(self, future, response):
628
630
return future .failure (error_type (self ))
629
631
630
632
if self .config ['sasl_mechanism' ] not in response .enabled_mechanisms :
631
- return future .failure (
633
+ future .failure (
632
634
Errors .UnsupportedSaslMechanismError (
633
635
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
634
636
% (self .config ['sasl_mechanism' ], response .enabled_mechanisms )))
635
637
elif self .config ['sasl_mechanism' ] == 'PLAIN' :
636
- return self ._try_authenticate_plain (future )
638
+ self ._try_authenticate_plain (future )
637
639
elif self .config ['sasl_mechanism' ] == 'GSSAPI' :
638
- return self ._try_authenticate_gssapi (future )
640
+ self ._try_authenticate_gssapi (future )
639
641
elif self .config ['sasl_mechanism' ] == 'OAUTHBEARER' :
640
- return self ._try_authenticate_oauth (future )
642
+ self ._try_authenticate_oauth (future )
641
643
elif self .config ['sasl_mechanism' ].startswith ("SCRAM-SHA-" ):
642
- return self ._try_authenticate_scram (future )
644
+ self ._try_authenticate_scram (future )
643
645
else :
644
- return future .failure (
646
+ future .failure (
645
647
Errors .UnsupportedSaslMechanismError (
646
648
'kafka-python does not support SASL mechanism %s' %
647
649
self .config ['sasl_mechanism' ]))
650
+ assert future .is_done , 'SASL future not complete after mechanism processing!'
651
+ if future .failed ():
652
+ self .close (error = future .exception )
653
+ else :
654
+ self .connect ()
648
655
649
656
def _send_bytes (self , data ):
650
657
"""Send some data via non-blocking IO
0 commit comments