Skip to content

Commit 1574768

Browse files
committed
Remove dependency on kerberso_sspi.
Implement SSPI authentication directly with pywin32.
1 parent fef796a commit 1574768

File tree

2 files changed

+116
-43
lines changed

2 files changed

+116
-43
lines changed

kafka/conn.py

Lines changed: 115 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,12 @@ class SSLWantWriteError(Exception):
8484

8585

8686
try:
87-
import kerberos_sspi
87+
import sspi
88+
import pywintypes
89+
import sspicon
90+
import win32security
8891
except ImportError:
89-
kerberos_sspi = None
92+
sspi = None
9093

9194
AFI_NAMES = {
9295
socket.AF_UNSPEC: "unspecified",
@@ -274,7 +277,7 @@ def __init__(self, host, port, afi, **configs):
274277
'sasl_plain_password required for PLAIN or SCRAM sasl'
275278
)
276279
if self.config['sasl_mechanism'] == 'GSSAPI':
277-
if gssapi is None and kerberos_sspi is None:
280+
if gssapi is None and sspi is None:
278281
raise AssertionError('No GSSAPI lib available')
279282
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
280283
if self.config['sasl_mechanism'] == 'OAUTHBEARER':
@@ -717,7 +720,7 @@ def _try_authenticate_gssapi(self, future):
717720
if gssapi is not None:
718721
return self._try_authenticate_gssapi_gss_implementation(future)
719722

720-
if kerberos_sspi is not None:
723+
if sspi is not None:
721724
return self._try_authenticate_gssapi_sspi_implementation(future)
722725

723726
def _try_authenticate_gssapi_gss_implementation(self, future):
@@ -799,78 +802,93 @@ def _try_authenticate_gssapi_gss_implementation(self, future):
799802
return future.success(True)
800803

801804
def _try_authenticate_gssapi_sspi_implementation(self, future):
805+
global log_sspi
806+
log_sspi = logging.getLogger("kafka.client.sspi")
802807
kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host
803-
service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name
808+
service_principal_name = self.config['sasl_kerberos_service_name'] + '/' + kerberos_host_name
809+
scheme = "Kerberos" # Do not try with Negotiate that comes with a different protocol than SASL
810+
# https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements
811+
flags = (
812+
sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication
813+
sspicon.ISC_REQ_INTEGRITY | # check for integrity
814+
sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages
815+
sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality
816+
)
804817

805818
err = None
806819
close = False
807-
808820
with self._lock:
809821
if not self._can_send_recv():
810822
err = Errors.NodeNotReadyError(str(self))
811823
close = False
812824
else:
825+
# Establish security context and negotiate protection level
826+
# For reference see RFC 4752, section 3
813827
try:
828+
log_sspi.debug("Create client security context")
829+
# instantiate sspi context
830+
client_ctx = sspi.ClientAuth(
831+
scheme,
832+
targetspn=service_principal_name,
833+
scflags=flags,
834+
)
835+
# Print some SSPI implementation
836+
log_sspi.info("Using %s SSPI Security Package (%s)", client_ctx.pkg_info["Name"], client_ctx.pkg_info["Comment"])
814837

815-
# Establish security context and negotiate protection level
816-
# For reference RFC 2222, section 7.2.1
817-
flags = \
818-
kerberos_sspi.GSS_C_CONF_FLAG | \
819-
kerberos_sspi.GSS_C_INTEG_FLAG | \
820-
kerberos_sspi.GSS_C_MUTUAL_FLAG | \
821-
kerberos_sspi.GSS_C_SEQUENCE_FLAG
822-
823-
# Create a security context.
824-
res, client_ctx = kerberos_sspi.authGSSClientInit(service_principal_name, gssflags=flags)
825-
assert res == kerberos_sspi.AUTH_GSS_COMPLETE
826-
827-
res = kerberos_sspi.AUTH_GSS_CONTINUE
828-
received_token = b""
829838
# Exchange tokens until authentication either succeeds or fails
830-
krb_round = 0
831-
while res == kerberos_sspi.AUTH_GSS_CONTINUE:
832-
krb_round += 1
833-
log.debug(f"Round {krb_round}")
834-
res = kerberos_sspi.authGSSClientStep(client_ctx, kerberos_sspi.encodestring(received_token))
835-
if res == -1:
836-
raise RuntimeError("Client Step Error", res)
837-
838-
output_token = client_ctx["response"] # get the binary data, not a base64 encoded version
839+
log_sspi.debug("Begining rounds...")
840+
received_token = None # no token to pass when initiating the first round
841+
while not client_ctx.authenticated:
842+
# calculate an output token from kafka token (or None on first iteration)
843+
# https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta
844+
# https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos
845+
# authorize method will wrap for us our token in sspi structures
846+
log_sspi.debug("Exchange a token")
847+
error, auth = client_ctx.authorize(received_token)
848+
if len(auth) > 0 and len(auth[0].Buffer):
849+
log_sspi.debug("Got token from context")
850+
# this buffer must be sent to the server whatever the result is
851+
output_token = auth[0].Buffer
852+
else:
853+
log_sspi.debug("Got no token, exchange finished")
854+
# seems to be the end of the loop
855+
output_token = None
839856

840857
# pass output token to kafka, or send empty response if the security
841858
# context is complete (output token is None in that case)
842-
if res != kerberos_sspi.AUTH_GSS_CONTINUE:
859+
if output_token is None:
860+
log_sspi.debug("Sending end of exchange to server")
843861
self._send_bytes_blocking(Int32.encode(0))
844862
else:
863+
log_sspi.debug("Sending token from local context to server")
845864
msg = output_token
846865
size = Int32.encode(len(msg))
847866
self._send_bytes_blocking(size + msg)
848867

849868
# The server will send a token back. Processing of this token either
850869
# establishes a security context, or it needs further token exchange.
851-
# The remote gssapi will be able to identify the needed next step.
870+
# The gssapi will be able to identify the needed next step.
852871
# The connection is closed on failure.
853872
header = self._recv_bytes_blocking(4)
854873
(token_size,) = struct.unpack('>i', header)
855874
received_token = self._recv_bytes_blocking(token_size)
875+
log_sspi.debug("Received token from server (size %s)", token_size)
856876

877+
sspi_amend_ctx_metadata(client_ctx)
857878
# Process the security layer negotiation token, sent by the server
858879
# once the security context is established.
859880

860881
# unwraps message containing supported protection levels and msg size
861-
kerberos_sspi.authGSSClientUnwrap(client_ctx, kerberos_sspi.encodestring(received_token))
862-
msg = client_ctx["response"]
882+
msg = sspi_gss_unwrap_step(client_ctx, received_token)
863883

864884
# Kafka currently doesn't support integrity or confidentiality security layers, so we
865885
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
866886
# by the server
867887
msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:]
868-
msg += service_principal_name.encode("utf-8")
869-
# add authorization identity to the response, GSS-wrap and send it
870-
871-
kerberos_sspi.authGSSClientWrap(client_ctx, kerberos_sspi.encodestring(msg), service_principal_name)
872-
msg = client_ctx["response"]
873888

889+
# add authorization identity to the response, GSS-wrap and send it
890+
msg = msg + service_principal_name.encode("utf-8")
891+
msg = sspi_gss_wrap_step(client_ctx, msg)
874892
size = Int32.encode(len(msg))
875893
self._send_bytes_blocking(size + msg)
876894

@@ -887,13 +905,13 @@ def _try_authenticate_gssapi_sspi_implementation(self, future):
887905
self.close(error=err)
888906
return future.failure(err)
889907

908+
# noinspection PyUnresolvedReferences
890909
log.info(
891-
'%s: Authenticated as %s to %s via Windows SSPI',
910+
'%s: Authenticated as %s to %s via SSPI/GSSAPI \\o/',
892911
self,
893-
kerberos_sspi.authGSSClientUserName(client_ctx),
894-
kerberos_sspi.authGSSServerTargetName(client_ctx), # incomplete API...
912+
client_ctx.initiator_name,
913+
client_ctx.service_name
895914
)
896-
897915
return future.success(True)
898916

899917

@@ -1648,3 +1666,58 @@ def dns_lookup(host, port, afi=socket.AF_UNSPEC):
16481666
' correct and resolvable?',
16491667
host, port, ex)
16501668
return []
1669+
1670+
1671+
# noinspection PyUnresolvedReferences
1672+
def sspi_gss_unwrap_step(sec_ctx, token):
1673+
"""
1674+
GSSAPI's unwrap with SSPI.
1675+
"""
1676+
buffer = win32security.PySecBufferDescType()
1677+
# Stream is a token coming from the other side
1678+
buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM))
1679+
buffer[0].Buffer = token
1680+
# Will receive the clear, or just unwrapped text if no encryption was used.
1681+
# Will be resized.
1682+
buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA))
1683+
1684+
pfQOP = sec_ctx.ctxt.DecryptMessage(buffer, sec_ctx._get_next_seq_num())
1685+
if pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT:
1686+
log.debug("Received token was not encrypted")
1687+
r = buffer[1].Buffer
1688+
return r
1689+
1690+
1691+
def sspi_gss_wrap_step(sec_ctx, msg, encrypt=False):
1692+
"""
1693+
GSSAPI's wrap with SSPI.
1694+
"""
1695+
1696+
size_info = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
1697+
trailer_size = size_info['SecurityTrailer']
1698+
block_size = size_info['BlockSize']
1699+
1700+
buffer = win32security.PySecBufferDescType()
1701+
1702+
buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA))
1703+
buffer[0].Buffer = msg
1704+
1705+
# Will receive the token that forms the beginning of the msg
1706+
buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN))
1707+
1708+
buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING))
1709+
1710+
fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT
1711+
sec_ctx.ctxt.EncryptMessage(fQOP, buffer, sec_ctx._get_next_seq_num())
1712+
# Sec token, then data, then padding
1713+
r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer
1714+
return r
1715+
1716+
1717+
def sspi_amend_ctx_metadata(sec_ctx):
1718+
"""Adds initiator and service names in the security context for ease of use"""
1719+
if not sec_ctx.authenticated:
1720+
raise ValueError("Sec context is not completly authenticated")
1721+
1722+
names = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES)
1723+
sec_ctx.initiator_name, sec_ctx.service_name = names

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def run(cls):
4242
"snappy": ["python-snappy"],
4343
"zstd": ["python-zstandard"],
4444
"gssapi": ["gssapi"],
45-
"gssapi_sspi": ["kerberos-sspi"],
45+
"gssapi_sspi": ["pywin32"],
4646
},
4747
cmdclass={"test": Tox},
4848
packages=find_packages(exclude=['test']),

0 commit comments

Comments
 (0)