Skip to content

Fix sasl gssapi plugin: do not rely on client_ctx.complete in auth_bytes() #2631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions kafka/sasl/gssapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ def __init__(self, **config):
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
self._is_done = False
self._is_authenticated = False
self.gssapi_name = None
if config.get('sasl_kerberos_name', None) is not None:
self.auth_id = str(config['sasl_kerberos_name'])
if isinstance(config['sasl_kerberos_name'], gssapi.Name):
self.gssapi_name = config['sasl_kerberos_name']
else:
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name
if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name):
self.gssapi_name = config['sasl_kerberos_name']
else:
if self.gssapi_name is None:
self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
self._next_token = self._client_ctx.step(None)
Expand All @@ -43,9 +44,8 @@ def auth_bytes(self):
# so mark is_done after the final auth_bytes are provided
# in practice we'll still receive a response when using SaslAuthenticate
# but not when using the prior unframed approach.
if self._client_ctx.complete:
if self._is_authenticated:
self._is_done = True
self._is_authenticated = True
return self._next_token or b''

def receive(self, auth_bytes):
Expand Down Expand Up @@ -74,6 +74,13 @@ def receive(self, auth_bytes):
]
# add authorization identity to the response, and GSS-wrap
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
# We need to identify the last token in auth_bytes();
# we can't rely on client_ctx.complete because it becomes True after generating
# the second-to-last token (after calling .step(auth_bytes) for the final time)
# We could introduce an additional state variable (i.e., self._final_token),
# but instead we just set _is_authenticated. Since the plugin interface does
# not read is_authenticated() until after is_done() is True, this should be fine.
self._is_authenticated = True

def is_done(self):
return self._is_done
Expand Down
42 changes: 42 additions & 0 deletions test/sasl/test_gssapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import absolute_import

try:
from unittest import mock
except ImportError:
import mock

from kafka.sasl import get_sasl_mechanism
import kafka.sasl.gssapi


def test_gssapi():
config = {
'sasl_kerberos_domain_name': 'foo',
'sasl_kerberos_service_name': 'bar',
}
client_ctx = mock.Mock()
client_ctx.step.side_effect = [b'init', b'exchange', b'complete', b'xxxx']
client_ctx.complete = False
def mocked_message_wrapper(msg, *args):
wrapped = mock.Mock()
type(wrapped).message = mock.PropertyMock(return_value=msg)
return wrapped
client_ctx.unwrap.side_effect = mocked_message_wrapper
client_ctx.wrap.side_effect = mocked_message_wrapper
kafka.sasl.gssapi.gssapi = mock.Mock()
kafka.sasl.gssapi.gssapi.SecurityContext.return_value = client_ctx
gssapi = get_sasl_mechanism('GSSAPI')(**config)
assert isinstance(gssapi, kafka.sasl.gssapi.SaslMechanismGSSAPI)
client_ctx.step.assert_called_with(None)

while not gssapi.is_done():
send_token = gssapi.auth_bytes()
receive_token = send_token # not realistic, but enough for testing
if send_token == b'\x00cbar@foo': # final wrapped message
receive_token = b'' # final message gets an empty response
gssapi.receive(receive_token)
if client_ctx.step.call_count == 3:
client_ctx.complete = True

assert gssapi.is_done()
assert gssapi.is_authenticated()
Loading