Closed
Description
We recently switch to leveraging the Confluent terraform provider for generating the configuration data for our applications into our secret store, which then loads directly into our apps via environment variables.
Before the change our BOOTSTRAP_SERVERS
env var looked like this:
pkc-wtf25.us-east-2.aws.confluent.cloud:9092
After the change the output from the terraform resource for bootstrap_endpoint looks like this:
SASL_SSL://pkc-wtf25.us-east-2.aws.confluent.cloud:9092
Our Java applications seem to have taken this change and not even noticed it, however the python apps leveraging kafka-python are running into an issue where the get_ip_port_afi
function doesn't recognize that there might be a prefix of XXX://
on the broker.host
.
[2025-05-01 17:04:55,456] ERROR [mysdk.app.container.kafka.kafka_listener.listen:90] Error while consuming message: invalid literal for int() with base 10: '//pkc-wtf25.us-east-2.aws.confluent.cloud'
Traceback (most recent call last):
File "/app/.venv/lib/python3.12/site-packages/mysdk/app/container/kafka/kafka_listener.py", line 44, in listen
topic_to_msgs: dict[str, list[ConsumerRecord]] = consumer.poll(
^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 693, in poll
records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 713, in _poll_once
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/consumer.py", line 279, in poll
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 667, in poll
metadata_timeout_ms = self._maybe_refresh_metadata()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 965, in _maybe_refresh_metadata
if not self._init_connect(node_id):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 443, in _init_connect
host, port, afi = get_ip_port_afi(broker.host)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/conn.py", line 1483, in get_ip_port_afi
port = int(port)
Metadata
Metadata
Assignees
Labels
No labels