-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Support SASL OAuthBearer Authentication #1750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
2582245
971aa1b
7fc9cef
acfcd86
f925328
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import kafka.errors as Errors | ||
from kafka.future import Future | ||
from kafka.metrics.stats import Avg, Count, Max, Rate | ||
from kafka.oauth.abstract import AbstractTokenProvider | ||
from kafka.protocol.admin import SaslHandShakeRequest | ||
from kafka.protocol.commit import OffsetFetchRequest | ||
from kafka.protocol.metadata import MetadataRequest | ||
|
@@ -179,6 +180,8 @@ class BrokerConnection(object): | |
sasl mechanism handshake. Default: 'kafka' | ||
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI | ||
sasl mechanism handshake. Default: one of bootstrap servers | ||
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider | ||
instance. (See kafka.oauth.abstract). Default: None | ||
""" | ||
|
||
DEFAULT_CONFIG = { | ||
|
@@ -210,10 +213,11 @@ class BrokerConnection(object): | |
'sasl_plain_username': None, | ||
'sasl_plain_password': None, | ||
'sasl_kerberos_service_name': 'kafka', | ||
'sasl_kerberos_domain_name': None | ||
'sasl_kerberos_domain_name': None, | ||
'sasl_oauth_token_provider': None | ||
} | ||
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') | ||
SASL_MECHANISMS = ('PLAIN', 'GSSAPI') | ||
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') | ||
|
||
def __init__(self, host, port, afi, **configs): | ||
self.host = host | ||
|
@@ -257,7 +261,10 @@ def __init__(self, host, port, afi, **configs): | |
if self.config['sasl_mechanism'] == 'GSSAPI': | ||
assert gssapi is not None, 'GSSAPI lib not available' | ||
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' | ||
|
||
if self.config['sasl_mechanism'] == 'OAUTHBEARER': | ||
token_provider = self.config['sasl_oauth_token_provider'] | ||
assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' | ||
assert isinstance(token_provider, AbstractTokenProvider), 'sasl_oauth_token_provider must implement AbstractTokenProvider' | ||
# This is not a general lock / this class is not generally thread-safe yet | ||
# However, to avoid pushing responsibility for maintaining | ||
# per-connection locks to the upstream client, we will use this lock to | ||
|
@@ -536,6 +543,8 @@ def _handle_sasl_handshake_response(self, future, response): | |
return self._try_authenticate_plain(future) | ||
elif self.config['sasl_mechanism'] == 'GSSAPI': | ||
return self._try_authenticate_gssapi(future) | ||
elif self.config['sasl_mechanism'] == 'OAUTHBEARER': | ||
return self._try_authenticate_oauth(future) | ||
else: | ||
return future.failure( | ||
Errors.UnsupportedSaslMechanismError( | ||
|
@@ -659,6 +668,51 @@ def _try_authenticate_gssapi(self, future): | |
log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) | ||
return future.success(True) | ||
|
||
def _try_authenticate_oauth(self, future): | ||
data = b'' | ||
|
||
msg = bytes(self._build_oauth_client_request().encode("utf-8")) | ||
size = Int32.encode(len(msg)) | ||
try: | ||
# Send SASL OAuthBearer request with OAuth token | ||
self._send_bytes_blocking(size + msg) | ||
|
||
# The server will send a zero sized message (that is Int32(0)) on success. | ||
# The connection is closed on failure | ||
data = self._recv_bytes_blocking(4) | ||
|
||
except ConnectionError as e: | ||
log.exception("%s: Error receiving reply from server", self) | ||
error = Errors.KafkaConnectionError("%s: %s" % (self, e)) | ||
self.close(error=error) | ||
return future.failure(error) | ||
|
||
if data != b'\x00\x00\x00\x00': | ||
error = Errors.AuthenticationFailedError('Unrecognized response during authentication') | ||
return future.failure(error) | ||
|
||
log.info('%s: Authenticated via OAuth', self) | ||
return future.success(True) | ||
|
||
def _build_oauth_client_request(self): | ||
token_provider = self.config['sasl_oauth_token_provider'] | ||
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is string There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I convert to bytes and encode the output of this function in |
||
|
||
def _token_extensions(self): | ||
""" | ||
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER | ||
initial request. | ||
""" | ||
token_provider = self.config['sasl_oauth_token_provider'] | ||
|
||
# Only run if the #extensions() method is implemented by the clients Token Provider class | ||
# Builds up a string separated by \x01 via a dict of key value pairs | ||
if callable(getattr(token_provider, "extensions", None)): | ||
msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) | ||
return "\x01" + msg | ||
pt2pham marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
return "" | ||
|
||
def blacked_out(self): | ||
""" | ||
Return true if we are disconnected from the given node and can't | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from __future__ import absolute_import | ||
|
||
from kafka.oauth.abstract import AbstractTokenProvider |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from __future__ import absolute_import | ||
|
||
import abc | ||
|
||
# This statement is compatible with both Python 2.7 & 3+ | ||
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting -- haven't seen this before |
||
|
||
class AbstractTokenProvider(ABC): | ||
""" | ||
A Token Provider must be used for the SASL OAuthBearer protocol. | ||
|
||
The implementation shsould ensure token reuse so that multiple | ||
pt2pham marked this conversation as resolved.
Show resolved
Hide resolved
|
||
calls at connect time do not create multiple tokens. The implementation | ||
should also periodically refresh the token in order to guarantee | ||
that each call returns an unexpired token. A timeout error should | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this timeout error work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's up to the implementer but ideally if the Token Provider pings some OAuth server for a token, if it takes too long it shouldn't go on forever. This was something that I've read/done in other OAuth implementations for different Kafka client libraries. |
||
be returned after a short period of inactivity so that the | ||
broker can log debugging info and retry. | ||
|
||
Token Providers MUST be implemented from this ABC. | ||
pt2pham marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
An optional method that may be implemented if the user chooses is: | ||
#extensions() - Returns a map of key-value pairs that can | ||
pt2pham marked this conversation as resolved.
Show resolved
Hide resolved
|
||
be sent with the SASL/OAUTHBEARER initial client request. If | ||
not provided, the values are ignored. This feature is only available | ||
in Kafka >= 2.1.0. | ||
""" | ||
|
||
def __init__(self, **config): | ||
pass | ||
|
||
@abc.abstractmethod | ||
def token(self): | ||
""" | ||
Returns a (str) ID/Access Token to be sent to the Kafka | ||
client. | ||
""" | ||
pass | ||
|
Uh oh!
There was an error while loading. Please reload this page.