Skip to content

Commit de43eee

Browse files
dpkpLaitynedArfey
authored
Change loglevel of cancelled errors to info (#2467)
Co-authored-by: Laityned <[email protected]> Co-authored-by: misha.gavela <[email protected]>
1 parent 411e62f commit de43eee

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

kafka/consumer/fetcher.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def send_fetches(self):
125125
log.debug("Sending FetchRequest to node %s", node_id)
126126
future = self._client.send(node_id, request, wakeup=False)
127127
future.add_callback(self._handle_fetch_response, request, time.time())
128-
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
128+
future.add_errback(self._handle_fetch_error, node_id)
129129
futures.append(future)
130130
self._fetch_futures.extend(futures)
131131
self._clean_done_fetch_futures()
@@ -778,6 +778,14 @@ def _handle_fetch_response(self, request, send_time, response):
778778
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
779779
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
780780

781+
def _handle_fetch_error(self, node_id, exception):
782+
log.log(
783+
logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR,
784+
'Fetch to node %s failed: %s',
785+
node_id,
786+
exception
787+
)
788+
781789
def _parse_fetched_data(self, completed_fetch):
782790
tp = completed_fetch.topic_partition
783791
fetch_offset = completed_fetch.fetched_offset

test/test_fetcher.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# pylint: skip-file
22
from __future__ import absolute_import
3+
import logging
34

45
import pytest
56

@@ -12,6 +13,7 @@
1213
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
1314
)
1415
from kafka.consumer.subscription_state import SubscriptionState
16+
import kafka.errors as Errors
1517
from kafka.future import Future
1618
from kafka.metrics import Metrics
1719
from kafka.protocol.fetch import FetchRequest, FetchResponse
@@ -378,6 +380,22 @@ def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_part
378380
assert len(fetcher._completed_fetches) == num_partitions
379381

380382

383+
@pytest.mark.parametrize(("exception", "log_level"), [
384+
(
385+
Errors.Cancelled(),
386+
logging.INFO
387+
),
388+
(
389+
Errors.KafkaError(),
390+
logging.ERROR
391+
)
392+
])
393+
def test__handle_fetch_error(fetcher, caplog, exception, log_level):
394+
fetcher._handle_fetch_error(3, exception)
395+
assert len(caplog.records) == 1
396+
assert caplog.records[0].levelname == logging.getLevelName(log_level)
397+
398+
381399
def test__unpack_message_set(fetcher):
382400
fetcher.config['check_crcs'] = False
383401
tp = TopicPartition('foo', 0)

0 commit comments

Comments
 (0)