|
7 | 7 | from kafka.vendor.six.moves import urllib
|
8 | 8 |
|
9 | 9 |
|
| 10 | +def try_authenticate(self, future): |
| 11 | + session = BotoSession() |
| 12 | + credentials = session.get_credentials().get_frozen_credentials() |
| 13 | + client = AwsMskIamClient( |
| 14 | + host=self.host, |
| 15 | + access_key=credentials.access_key, |
| 16 | + secret_key=credentials.secret_key, |
| 17 | + region=session.get_config_variable('region'), |
| 18 | + token=credentials.token, |
| 19 | + ) |
| 20 | + |
| 21 | + msg = client.first_message() |
| 22 | + size = Int32.encode(len(msg)) |
| 23 | + |
| 24 | + err = None |
| 25 | + close = False |
| 26 | + with self._lock: |
| 27 | + if not self._can_send_recv(): |
| 28 | + err = Errors.NodeNotReadyError(str(self)) |
| 29 | + close = False |
| 30 | + else: |
| 31 | + try: |
| 32 | + self._send_bytes_blocking(size + msg) |
| 33 | + data = self._recv_bytes_blocking(4) |
| 34 | + data = self._recv_bytes_blocking(struct.unpack('4B', data)[-1]) |
| 35 | + except (ConnectionError, TimeoutError) as e: |
| 36 | + log.exception("%s: Error receiving reply from server", self) |
| 37 | + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) |
| 38 | + close = True |
| 39 | + |
| 40 | + if err is not None: |
| 41 | + if close: |
| 42 | + self.close(error=err) |
| 43 | + return future.failure(err) |
| 44 | + |
| 45 | + log.info('%s: Authenticated via AWS_MSK_IAM %s', self, data.decode('utf-8')) |
| 46 | + return future.success(True) |
| 47 | + |
10 | 48 | class AwsMskIamClient:
|
11 | 49 | UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~'
|
12 | 50 |
|
|
0 commit comments