Skip to content

Commit d596b58

Browse files
Sharu95bradenneal1
authored andcommitted
add validate_config function for msk module (#176)
1 parent 06f0450 commit d596b58

File tree

2 files changed

+15
-16
lines changed

2 files changed

+15
-16
lines changed

kafka/conn.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,6 @@ class SSLWantWriteError(Exception):
6868
gssapi = None
6969
GSSError = None
7070

71-
# needed for AWS_MSK_IAM authentication:
72-
try:
73-
from botocore.session import Session as BotoSession
74-
except ImportError:
75-
# no botocore available, will disable AWS_MSK_IAM mechanism
76-
BotoSession = None
77-
7871
AFI_NAMES = {
7972
socket.AF_UNSPEC: "unspecified",
8073
socket.AF_INET: "IPv4",
@@ -113,7 +106,7 @@ class BrokerConnection:
113106
will be applied to the backoff resulting in a random range between
114107
20% below and 20% above the computed value. Default: 1000.
115108
connection_timeout_ms (int): Connection timeout in milliseconds.
116-
Default: None, which defaults it to the same value as
109+
Default: None, which defaults it to the same value as
117110
request_timeout_ms.
118111
request_timeout_ms (int): Client request timeout in milliseconds.
119112
Default: 30000.
@@ -235,7 +228,7 @@ def __init__(self, host, port, afi, **configs):
235228
for key in self.config:
236229
if key in configs:
237230
self.config[key] = configs[key]
238-
231+
239232
if self.config['connection_timeout_ms'] is None:
240233
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']
241234

@@ -253,19 +246,15 @@ def __init__(self, host, port, afi, **configs):
253246
assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
254247
'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))
255248

256-
257249
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
258250
assert ssl_available, "Python wasn't built with SSL support"
259251

260-
if self.config['sasl_mechanism'] == 'AWS_MSK_IAM':
261-
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
262-
assert self.config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'
263-
264252
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
265253
assert self.config['sasl_mechanism'] in sasl.MECHANISMS, (
266254
'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys()))
267255
)
268256
sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self)
257+
269258
# This is not a general lock / this class is not generally thread-safe yet
270259
# However, to avoid pushing responsibility for maintaining
271260
# per-connection locks to the upstream client, we will use this lock to

kafka/sasl/msk.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,20 @@
1010
from kafka.protocol.types import Int32
1111
import kafka.errors as Errors
1212

13-
from botocore.session import Session as BotoSession # importing it in advance is not an option apparently...
13+
# needed for AWS_MSK_IAM authentication:
14+
try:
15+
from botocore.session import Session as BotoSession
16+
except ImportError:
17+
# no botocore available, will disable AWS_MSK_IAM mechanism
18+
BotoSession = None
19+
1420
from typing import Optional
1521

1622

23+
def validate_config(conn):
24+
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
25+
assert conn.config.get('security_protocol') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'
26+
1727
def try_authenticate(self, future):
1828

1929
session = BotoSession()
@@ -25,7 +35,7 @@ def try_authenticate(self, future):
2535
region=session.get_config_variable('region'),
2636
token=credentials.token,
2737
)
28-
38+
2939
msg = client.first_message()
3040
size = Int32.encode(len(msg))
3141

0 commit comments

Comments
 (0)