Skip to content

Commit 8e2ed3e

Browse files
pt2phamdpkp
authored andcommitted
Support SASL OAuthBearer Authentication (#1750)
1 parent d032844 commit 8e2ed3e

File tree

7 files changed

+117
-6
lines changed

7 files changed

+117
-6
lines changed

kafka/admin/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ class KafkaAdminClient(object):
133133
Default: None
134134
sasl_kerberos_service_name (str): Service name to include in GSSAPI
135135
sasl mechanism handshake. Default: 'kafka'
136+
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
137+
instance. (See kafka.oauth.abstract). Default: None
136138
137139
"""
138140
DEFAULT_CONFIG = {
@@ -166,6 +168,7 @@ class KafkaAdminClient(object):
166168
'sasl_plain_username': None,
167169
'sasl_plain_password': None,
168170
'sasl_kerberos_service_name': 'kafka',
171+
'sasl_oauth_token_provider': None,
169172

170173
# metrics configs
171174
'metric_reporters': [],

kafka/client_async.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ class KafkaClient(object):
151151
sasl mechanism handshake. Default: 'kafka'
152152
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
153153
sasl mechanism handshake. Default: one of bootstrap servers
154+
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
155+
instance. (See kafka.oauth.abstract). Default: None
154156
"""
155157

156158
DEFAULT_CONFIG = {
@@ -188,7 +190,8 @@ class KafkaClient(object):
188190
'sasl_plain_username': None,
189191
'sasl_plain_password': None,
190192
'sasl_kerberos_service_name': 'kafka',
191-
'sasl_kerberos_domain_name': None
193+
'sasl_kerberos_domain_name': None,
194+
'sasl_oauth_token_provider': None
192195
}
193196

194197
def __init__(self, **configs):

kafka/conn.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import kafka.errors as Errors
2626
from kafka.future import Future
2727
from kafka.metrics.stats import Avg, Count, Max, Rate
28+
from kafka.oauth.abstract import AbstractTokenProvider
2829
from kafka.protocol.admin import SaslHandShakeRequest
2930
from kafka.protocol.commit import OffsetFetchRequest
3031
from kafka.protocol.metadata import MetadataRequest
@@ -184,6 +185,8 @@ class BrokerConnection(object):
184185
sasl mechanism handshake. Default: 'kafka'
185186
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
186187
sasl mechanism handshake. Default: one of bootstrap servers
188+
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
189+
instance. (See kafka.oauth.abstract). Default: None
187190
"""
188191

189192
DEFAULT_CONFIG = {
@@ -216,10 +219,11 @@ class BrokerConnection(object):
216219
'sasl_plain_username': None,
217220
'sasl_plain_password': None,
218221
'sasl_kerberos_service_name': 'kafka',
219-
'sasl_kerberos_domain_name': None
222+
'sasl_kerberos_domain_name': None,
223+
'sasl_oauth_token_provider': None
220224
}
221225
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
222-
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
226+
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER')
223227

224228
def __init__(self, host, port, afi, **configs):
225229
self.host = host
@@ -263,7 +267,10 @@ def __init__(self, host, port, afi, **configs):
263267
if self.config['sasl_mechanism'] == 'GSSAPI':
264268
assert gssapi is not None, 'GSSAPI lib not available'
265269
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
266-
270+
if self.config['sasl_mechanism'] == 'OAUTHBEARER':
271+
token_provider = self.config['sasl_oauth_token_provider']
272+
assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
273+
assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()'
267274
# This is not a general lock / this class is not generally thread-safe yet
268275
# However, to avoid pushing responsibility for maintaining
269276
# per-connection locks to the upstream client, we will use this lock to
@@ -537,6 +544,8 @@ def _handle_sasl_handshake_response(self, future, response):
537544
return self._try_authenticate_plain(future)
538545
elif self.config['sasl_mechanism'] == 'GSSAPI':
539546
return self._try_authenticate_gssapi(future)
547+
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
548+
return self._try_authenticate_oauth(future)
540549
else:
541550
return future.failure(
542551
Errors.UnsupportedSaslMechanismError(
@@ -660,6 +669,51 @@ def _try_authenticate_gssapi(self, future):
660669
log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
661670
return future.success(True)
662671

672+
def _try_authenticate_oauth(self, future):
673+
data = b''
674+
675+
msg = bytes(self._build_oauth_client_request().encode("utf-8"))
676+
size = Int32.encode(len(msg))
677+
try:
678+
# Send SASL OAuthBearer request with OAuth token
679+
self._send_bytes_blocking(size + msg)
680+
681+
# The server will send a zero sized message (that is Int32(0)) on success.
682+
# The connection is closed on failure
683+
data = self._recv_bytes_blocking(4)
684+
685+
except ConnectionError as e:
686+
log.exception("%s: Error receiving reply from server", self)
687+
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
688+
self.close(error=error)
689+
return future.failure(error)
690+
691+
if data != b'\x00\x00\x00\x00':
692+
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
693+
return future.failure(error)
694+
695+
log.info('%s: Authenticated via OAuth', self)
696+
return future.success(True)
697+
698+
def _build_oauth_client_request(self):
699+
token_provider = self.config['sasl_oauth_token_provider']
700+
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions())
701+
702+
def _token_extensions(self):
703+
"""
704+
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
705+
initial request.
706+
"""
707+
token_provider = self.config['sasl_oauth_token_provider']
708+
709+
# Only run if the #extensions() method is implemented by the clients Token Provider class
710+
# Builds up a string separated by \x01 via a dict of key value pairs
711+
if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0:
712+
msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()])
713+
return "\x01" + msg
714+
else:
715+
return ""
716+
663717
def blacked_out(self):
664718
"""
665719
Return true if we are disconnected from the given node and can't

kafka/consumer/group.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ class KafkaConsumer(six.Iterator):
240240
sasl mechanism handshake. Default: 'kafka'
241241
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
242242
sasl mechanism handshake. Default: one of bootstrap servers
243+
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
244+
instance. (See kafka.oauth.abstract). Default: None
243245
244246
Note:
245247
Configuration parameters are described in more detail at
@@ -299,7 +301,8 @@ class KafkaConsumer(six.Iterator):
299301
'sasl_plain_username': None,
300302
'sasl_plain_password': None,
301303
'sasl_kerberos_service_name': 'kafka',
302-
'sasl_kerberos_domain_name': None
304+
'sasl_kerberos_domain_name': None,
305+
'sasl_oauth_token_provider': None
303306
}
304307
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
305308

kafka/oauth/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.oauth.abstract import AbstractTokenProvider

kafka/oauth/abstract.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import absolute_import
2+
3+
import abc
4+
5+
# This statement is compatible with both Python 2.7 & 3+
6+
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
7+
8+
class AbstractTokenProvider(ABC):
9+
"""
10+
A Token Provider must be used for the SASL OAuthBearer protocol.
11+
12+
The implementation should ensure token reuse so that multiple
13+
calls at connect time do not create multiple tokens. The implementation
14+
should also periodically refresh the token in order to guarantee
15+
that each call returns an unexpired token. A timeout error should
16+
be returned after a short period of inactivity so that the
17+
broker can log debugging info and retry.
18+
19+
Token Providers MUST implement the token() method
20+
"""
21+
22+
def __init__(self, **config):
23+
pass
24+
25+
@abc.abstractmethod
26+
def token(self):
27+
"""
28+
Returns a (str) ID/Access Token to be sent to the Kafka
29+
client.
30+
"""
31+
pass
32+
33+
def extensions(self):
34+
"""
35+
This is an OPTIONAL method that may be implemented.
36+
37+
Returns a map of key-value pairs that can
38+
be sent with the SASL/OAUTHBEARER initial client request. If
39+
not implemented, the values are ignored. This feature is only available
40+
in Kafka >= 2.1.0.
41+
"""
42+
return {}

kafka/producer/kafka.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ class KafkaProducer(object):
277277
sasl mechanism handshake. Default: 'kafka'
278278
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
279279
sasl mechanism handshake. Default: one of bootstrap servers
280+
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
281+
instance. (See kafka.oauth.abstract). Default: None
280282
281283
Note:
282284
Configuration parameters are described in more detail at
@@ -328,7 +330,8 @@ class KafkaProducer(object):
328330
'sasl_plain_username': None,
329331
'sasl_plain_password': None,
330332
'sasl_kerberos_service_name': 'kafka',
331-
'sasl_kerberos_domain_name': None
333+
'sasl_kerberos_domain_name': None,
334+
'sasl_oauth_token_provider': None
332335
}
333336

334337
_COMPRESSORS = {

0 commit comments

Comments
 (0)