Skip to content

Commit 5f3b966

Browse files
committed
Use kafka.util.Timer
1 parent 8dfdc96 commit 5f3b966

File tree

6 files changed

+78
-93
lines changed

6 files changed

+78
-93
lines changed

kafka/client_async.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from kafka.metrics.stats.rate import TimeUnit
2828
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
2929
from kafka.protocol.metadata import MetadataRequest
30-
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name, timeout_ms_fn
30+
from kafka.util import Dict, Timer, WeakMethod, ensure_valid_topic_name
3131
# Although this looks unused, it actually monkey-patches socket.socketpair()
3232
# and should be left in as long as we're using socket.socketpair() in this file
3333
from kafka.vendor import socketpair # noqa: F401
@@ -646,11 +646,11 @@ def poll(self, timeout_ms=None, future=None):
646646
if not isinstance(timeout_ms, (int, float, type(None))):
647647
raise TypeError('Invalid type for timeout: %s' % type(timeout_ms))
648648

649-
begin = time.time()
650649
if timeout_ms is not None:
651-
timeout_at = begin + (timeout_ms / 1000)
650+
timer = Timer(timeout_ms)
652651
else:
653-
timeout_at = begin + (self.config['request_timeout_ms'] / 1000)
652+
timer = Timer(self.config['request_timeout_ms'])
653+
654654
# Loop for futures, break after first loop if None
655655
responses = []
656656
while True:
@@ -675,12 +675,11 @@ def poll(self, timeout_ms=None, future=None):
675675
if future is not None and future.is_done:
676676
timeout = 0
677677
else:
678-
user_timeout_ms = 1000 * max(0, timeout_at - time.time())
679678
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
680679
request_timeout_ms = self._next_ifr_request_timeout_ms()
681-
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", user_timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
680+
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timer.timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
682681
timeout = min(
683-
user_timeout_ms,
682+
timer.timeout_ms,
684683
metadata_timeout_ms,
685684
idle_connection_timeout_ms,
686685
request_timeout_ms)
@@ -698,7 +697,7 @@ def poll(self, timeout_ms=None, future=None):
698697
break
699698
elif future.is_done:
700699
break
701-
elif timeout_ms is not None and time.time() >= timeout_at:
700+
elif timeout_ms is not None and timer.expired:
702701
break
703702

704703
return responses
@@ -1175,16 +1174,16 @@ def await_ready(self, node_id, timeout_ms=30000):
11751174
This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
11761175
care.
11771176
"""
1178-
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
1177+
timer = Timer(timeout_ms)
11791178
self.poll(timeout_ms=0)
11801179
if self.is_ready(node_id):
11811180
return True
11821181

1183-
while not self.is_ready(node_id) and inner_timeout_ms() > 0:
1182+
while not self.is_ready(node_id) and not timer.expired:
11841183
if self.connection_failed(node_id):
11851184
raise Errors.KafkaConnectionError("Connection to %s failed." % (node_id,))
11861185
self.maybe_connect(node_id)
1187-
self.poll(timeout_ms=inner_timeout_ms())
1186+
self.poll(timeout_ms=timer.timeout_ms)
11881187
return self.is_ready(node_id)
11891188

11901189
def send_and_receive(self, node_id, request):

kafka/consumer/fetcher.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafka.record import MemoryRecords
2020
from kafka.serializer import Deserializer
2121
from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp
22-
from kafka.util import timeout_ms_fn
22+
from kafka.util import Timer
2323

2424
log = logging.getLogger(__name__)
2525

@@ -230,15 +230,15 @@ def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
230230
if not timestamps:
231231
return {}
232232

233-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout fetching offsets')
233+
timer = Timer(timeout_ms, "Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
234234
timestamps = copy.copy(timestamps)
235235
fetched_offsets = dict()
236236
while True:
237237
if not timestamps:
238238
return {}
239239

240240
future = self._send_list_offsets_requests(timestamps)
241-
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
241+
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
242242

243243
# Timeout w/o future completion
244244
if not future.is_done:
@@ -256,12 +256,17 @@ def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
256256

257257
if future.exception.invalid_metadata or self._client.cluster.need_update:
258258
refresh_future = self._client.cluster.request_update()
259-
self._client.poll(future=refresh_future, timeout_ms=inner_timeout_ms())
259+
self._client.poll(future=refresh_future, timeout_ms=timer.timeout_ms)
260260

261261
if not future.is_done:
262262
break
263263
else:
264-
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
264+
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
265+
time.sleep(self.config['retry_backoff_ms'] / 1000)
266+
else:
267+
time.sleep(timer.timeout_ms / 1000)
268+
269+
timer.maybe_raise()
265270

266271
raise Errors.KafkaTimeoutError(
267272
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))

kafka/coordinator/base.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from kafka.metrics.stats import Avg, Count, Max, Rate
1717
from kafka.protocol.find_coordinator import FindCoordinatorRequest
1818
from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
19-
from kafka.util import timeout_ms_fn, Timer
19+
from kafka.util import Timer
2020

2121
log = logging.getLogger('kafka.coordinator')
2222

@@ -293,7 +293,10 @@ def ensure_coordinator_ready(self, timeout_ms=None):
293293
if not metadata_update.is_done:
294294
return False
295295
else:
296-
time.sleep(min(timer.timeout_ms, self.config['retry_backoff_ms']) / 1000)
296+
if timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
297+
time.sleep(self.config['retry_backoff_ms'] / 1000)
298+
else:
299+
time.sleep(timer.timeout_ms / 1000)
297300
else:
298301
raise future.exception # pylint: disable-msg=raising-bad-type
299302
if timer.expired:
@@ -458,7 +461,8 @@ def join_group(self, timeout_ms=None):
458461
while not self.coordinator_unknown():
459462
if not self._client.in_flight_request_count(self.coordinator_id):
460463
break
461-
self._client.poll(timeout_ms=min(timer.timeout_ms, 200))
464+
poll_timeout_ms = 200 if timer.timeout_ms is None or timer.timeout_ms > 200 else timer.timeout_ms
465+
self._client.poll(timeout_ms=poll_timeout_ms)
462466
if timer.expired:
463467
return False
464468
else:
@@ -491,7 +495,10 @@ def join_group(self, timeout_ms=None):
491495
elif timer.expired:
492496
return False
493497
else:
494-
time.sleep(min(timer.timeout_ms, self.config['retry_backoff_ms']) / 1000)
498+
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
499+
time.sleep(self.config['retry_backoff_ms'] / 1000)
500+
else:
501+
time.sleep(timer.timeout_ms / 1000)
495502

496503
def _send_join_group_request(self):
497504
"""Join the group and return the assignment for the next generation.

kafka/coordinator/consumer.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafka.metrics.stats import Avg, Count, Max, Rate
2020
from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
2121
from kafka.structs import OffsetAndMetadata, TopicPartition
22-
from kafka.util import timeout_ms_fn, Timer, WeakMethod
22+
from kafka.util import Timer, WeakMethod
2323

2424

2525
log = logging.getLogger(__name__)
@@ -405,7 +405,7 @@ def refresh_committed_offsets_if_needed(self, timeout_ms=None):
405405
except Errors.KafkaTimeoutError:
406406
return False
407407
for partition, offset in six.iteritems(offsets):
408-
log.debug("Setting offset for partition %s to the committed offset %s", partition, offset.offset);
408+
log.debug("Setting offset for partition %s to the committed offset %s", partition, offset.offset)
409409
self._subscription.seek(partition, offset.offset)
410410
return True
411411

@@ -424,32 +424,35 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
424424
if not partitions:
425425
return {}
426426

427-
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
427+
future_key = frozenset(partitions)
428+
timer = Timer(timeout_ms)
428429
while True:
429-
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
430+
self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms)
430431

431432
# contact coordinator to fetch committed offsets
432-
future_key = frozenset(partitions)
433433
if future_key in self._offset_fetch_futures:
434434
future = self._offset_fetch_futures[future_key]
435435
else:
436436
future = self._send_offset_fetch_request(partitions)
437437
self._offset_fetch_futures[future_key] = future
438438

439-
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
439+
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
440440

441-
if not future.is_done:
442-
raise Errors.KafkaTimeoutError()
443-
else:
441+
if future.is_done:
444442
del self._offset_fetch_futures[future_key]
445443

446-
if future.succeeded():
447-
return future.value
444+
if future.succeeded():
445+
return future.value
448446

449-
if not future.retriable():
450-
raise future.exception # pylint: disable-msg=raising-bad-type
447+
elif not future.retriable():
448+
raise future.exception # pylint: disable-msg=raising-bad-type
451449

452-
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
450+
# future failed but is retriable, or is not done yet
451+
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
452+
time.sleep(self.config['retry_backoff_ms'] / 1000)
453+
else:
454+
time.sleep(timer.timeout_ms / 1000)
455+
timer.maybe_raise()
453456

454457
def close(self, autocommit=True, timeout_ms=None):
455458
"""Close the coordinator, leave the current group,
@@ -540,23 +543,26 @@ def commit_offsets_sync(self, offsets, timeout_ms=None):
540543
if not offsets:
541544
return
542545

543-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout in coordinator.poll')
546+
timer = Timer(timeout_ms)
544547
while True:
545-
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
548+
self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms)
546549

547550
future = self._send_offset_commit_request(offsets)
548-
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
549-
550-
if not future.is_done:
551-
raise Errors.KafkaTimeoutError()
551+
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
552552

553-
if future.succeeded():
554-
return future.value
553+
if future.is_done:
554+
if future.succeeded():
555+
return future.value
555556

556-
if not future.retriable():
557-
raise future.exception # pylint: disable-msg=raising-bad-type
557+
elif not future.retriable():
558+
raise future.exception # pylint: disable-msg=raising-bad-type
558559

559-
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
560+
# future failed but is retriable, or it is still pending
561+
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
562+
time.sleep(self.config['retry_backoff_ms'] / 1000)
563+
else:
564+
time.sleep(timer.timeout_ms / 1000)
565+
timer.maybe_raise()
560566

561567
def _maybe_auto_commit_offsets_sync(self, timeout_ms=None):
562568
if self.config['enable_auto_commit']:

kafka/producer/kafka.py

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import logging
66
import socket
77
import threading
8-
import time
98
import warnings
109
import weakref
1110

@@ -24,7 +23,7 @@
2423
from kafka.record.legacy_records import LegacyRecordBatchBuilder
2524
from kafka.serializer import Serializer
2625
from kafka.structs import TopicPartition
27-
from kafka.util import ensure_valid_topic_name
26+
from kafka.util import Timer, ensure_valid_topic_name
2827

2928

3029
log = logging.getLogger(__name__)
@@ -664,8 +663,7 @@ def __getattr__(self, name):
664663

665664
def partitions_for(self, topic):
666665
"""Returns set of all known partitions for the topic."""
667-
max_wait = self.config['max_block_ms'] / 1000
668-
return self._wait_on_metadata(topic, max_wait)
666+
return self._wait_on_metadata(topic, self.config['max_block_ms'])
669667

670668
@classmethod
671669
def max_usable_produce_magic(cls, api_version):
@@ -835,14 +833,11 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
835833
assert not (value is None and key is None), 'Need at least one: key or value'
836834
ensure_valid_topic_name(topic)
837835
key_bytes = value_bytes = None
836+
timer = Timer(self.config['max_block_ms'], "Failed to assign partition for message in max_block_ms.")
838837
try:
839838
assigned_partition = None
840-
elapsed = 0.0
841-
begin = time.time()
842-
timeout = self.config['max_block_ms'] / 1000
843-
while assigned_partition is None and elapsed < timeout:
844-
elapsed = time.time() - begin
845-
self._wait_on_metadata(topic, timeout - elapsed)
839+
while assigned_partition is None and not timer.expired:
840+
self._wait_on_metadata(topic, timer.timeout_ms)
846841

847842
key_bytes = self._serialize(
848843
self.config['key_serializer'],
@@ -856,7 +851,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
856851
assigned_partition = self._partition(topic, partition, key, value,
857852
key_bytes, value_bytes)
858853
if assigned_partition is None:
859-
raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timeout)
854+
raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timer.elapsed_ms / 1000)
860855
else:
861856
partition = assigned_partition
862857

@@ -931,7 +926,7 @@ def _ensure_valid_record_size(self, size):
931926
" the maximum request size you have configured with the"
932927
" max_request_size configuration" % (size,))
933928

934-
def _wait_on_metadata(self, topic, max_wait):
929+
def _wait_on_metadata(self, topic, max_wait_ms):
935930
"""
936931
Wait for cluster metadata including partitions for the given topic to
937932
be available.
@@ -949,36 +944,29 @@ def _wait_on_metadata(self, topic, max_wait):
949944
"""
950945
# add topic to metadata topic list if it is not there already.
951946
self._sender.add_topic(topic)
952-
begin = time.time()
953-
elapsed = 0.0
947+
timer = Timer(max_wait_ms, "Failed to update metadata after %.1f secs." % (max_wait_ms * 1000,))
954948
metadata_event = None
955949
while True:
956950
partitions = self._metadata.partitions_for_topic(topic)
957951
if partitions is not None:
958952
return partitions
959-
960-
if elapsed >= max_wait:
961-
raise Errors.KafkaTimeoutError(
962-
"Failed to update metadata after %.1f secs." % (max_wait,))
963-
953+
timer.maybe_raise()
964954
if not metadata_event:
965955
metadata_event = threading.Event()
966956

967957
log.debug("%s: Requesting metadata update for topic %s", str(self), topic)
968-
969958
metadata_event.clear()
970959
future = self._metadata.request_update()
971960
future.add_both(lambda e, *args: e.set(), metadata_event)
972961
self._sender.wakeup()
973-
metadata_event.wait(max_wait - elapsed)
962+
metadata_event.wait(timer.timeout_ms / 1000)
974963
if not metadata_event.is_set():
975964
raise Errors.KafkaTimeoutError(
976-
"Failed to update metadata after %.1f secs." % (max_wait,))
965+
"Failed to update metadata after %.1f secs." % (max_wait_ms * 1000,))
977966
elif topic in self._metadata.unauthorized_topics:
978967
raise Errors.TopicAuthorizationFailedError(set([topic]))
979968
else:
980-
elapsed = time.time() - begin
981-
log.debug("%s: _wait_on_metadata woke after %s secs.", str(self), elapsed)
969+
log.debug("%s: _wait_on_metadata woke after %s secs.", str(self), timer.elapsed_ms / 1000)
982970

983971
def _serialize(self, f, topic, data):
984972
if not f:

0 commit comments

Comments
 (0)