Skip to content

Confluent's provided bootstrap endpoint from terraform causes parsing/casting error on broker.host #2606

Closed
@gregswift-pwell

Description

@gregswift-pwell

We recently switch to leveraging the Confluent terraform provider for generating the configuration data for our applications into our secret store, which then loads directly into our apps via environment variables.

Before the change our BOOTSTRAP_SERVERS env var looked like this:

pkc-wtf25.us-east-2.aws.confluent.cloud:9092

After the change the output from the terraform resource for bootstrap_endpoint looks like this:

SASL_SSL://pkc-wtf25.us-east-2.aws.confluent.cloud:9092

Our Java applications seem to have taken this change and not even noticed it, however the python apps leveraging kafka-python are running into an issue where the get_ip_port_afi function doesn't recognize that there might be a prefix of XXX:// on the broker.host.

[2025-05-01 17:04:55,456] ERROR [mysdk.app.container.kafka.kafka_listener.listen:90] Error while consuming message: invalid literal for int() with base 10: '//pkc-wtf25.us-east-2.aws.confluent.cloud'
Traceback (most recent call last):
  File "/app/.venv/lib/python3.12/site-packages/mysdk/app/container/kafka/kafka_listener.py", line 44, in listen
    topic_to_msgs: dict[str, list[ConsumerRecord]] = consumer.poll(
                                                     ^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 693, in poll
    records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 713, in _poll_once
    if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/consumer.py", line 279, in poll
    self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
  File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
    self._client.poll(future=future, timeout_ms=inner_timeout_ms())
  File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 667, in poll
    metadata_timeout_ms = self._maybe_refresh_metadata()
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 965, in _maybe_refresh_metadata
    if not self._init_connect(node_id):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 443, in _init_connect
    host, port, afi = get_ip_port_afi(broker.host)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/conn.py", line 1483, in get_ip_port_afi
    port = int(port)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions