Skip to content

Commit 837df1e

Browse files
dpkp10101010alfaix
authored
AdminClient: support delete_records (#2535)
Co-authored-by: Ruslan <[email protected]> Co-authored-by: Arsen Kitov <[email protected]>
1 parent f7c234d commit 837df1e

File tree

4 files changed

+220
-8
lines changed

4 files changed

+220
-8
lines changed

kafka/admin/client.py

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
1616
import kafka.errors as Errors
1717
from kafka.errors import (
18-
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
18+
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, UnknownTopicOrPartitionError,
1919
UnrecognizedBrokerVersion, IllegalArgumentError)
2020
from kafka.metrics import MetricConfig, Metrics
2121
from kafka.protocol.admin import (
2222
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
2323
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
24-
DeleteGroupsRequest, DescribeLogDirsRequest
25-
)
24+
DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest)
2625
from kafka.protocol.commit import OffsetFetchRequest
2726
from kafka.protocol.find_coordinator import FindCoordinatorRequest
2827
from kafka.protocol.metadata import MetadataRequest
@@ -1116,8 +1115,118 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
11161115
.format(version))
11171116
return self._send_request_to_controller(request)
11181117

1119-
# delete records protocol not yet implemented
1120-
# Note: send the request to the partition leaders
1118+
def _get_leader_for_partitions(self, partitions, timeout_ms=None):
1119+
"""Finds ID of the leader node for every given topic partition.
1120+
1121+
Will raise UnknownTopicOrPartitionError if for some partition no leader can be found.
1122+
1123+
:param partitions: ``[TopicPartition]``: partitions for which to find leaders.
1124+
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
1125+
config.
1126+
1127+
:return: Dictionary with ``{leader_id -> {partitions}}``
1128+
"""
1129+
timeout_ms = self._validate_timeout(timeout_ms)
1130+
1131+
partitions = set(partitions)
1132+
topics = set(tp.topic for tp in partitions)
1133+
1134+
response = self._get_cluster_metadata(topics=topics).to_object()
1135+
1136+
leader2partitions = defaultdict(list)
1137+
valid_partitions = set()
1138+
for topic in response.get("topics", ()):
1139+
for partition in topic.get("partitions", ()):
1140+
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
1141+
if t2p in partitions:
1142+
leader2partitions[partition["leader"]].append(t2p)
1143+
valid_partitions.add(t2p)
1144+
1145+
if len(partitions) != len(valid_partitions):
1146+
unknown = set(partitions) - valid_partitions
1147+
raise UnknownTopicOrPartitionError(
1148+
"The following partitions are not known: %s"
1149+
% ", ".join(str(x) for x in unknown)
1150+
)
1151+
1152+
return leader2partitions
1153+
1154+
def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
1155+
"""Delete records whose offset is smaller than the given offset of the corresponding partition.
1156+
1157+
:param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the
1158+
given partitions.
1159+
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
1160+
config.
1161+
:param partition_leader_id: ``str``: If specified, all deletion requests will be sent to
1162+
this node. No check is performed verifying that this is indeed the leader for all
1163+
listed partitions: use with caution.
1164+
1165+
:return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker.
1166+
See DeleteRecordsResponse for possible fields. error_code for all partitions is
1167+
guaranteed to be zero, otherwise an exception is raised.
1168+
"""
1169+
timeout_ms = self._validate_timeout(timeout_ms)
1170+
responses = []
1171+
version = self._client.api_version(DeleteRecordsRequest, max_version=0)
1172+
if version is None:
1173+
raise IncompatibleBrokerVersion("Broker does not support DeleteGroupsRequest")
1174+
1175+
# We want to make as few requests as possible
1176+
# If a single node serves as a partition leader for multiple partitions (and/or
1177+
# topics), we can send all of those in a single request.
1178+
# For that we store {leader -> {partitions for leader}}, and do 1 request per leader
1179+
if partition_leader_id is None:
1180+
leader2partitions = self._get_leader_for_partitions(
1181+
set(records_to_delete), timeout_ms
1182+
)
1183+
else:
1184+
leader2partitions = {partition_leader_id: set(records_to_delete)}
1185+
1186+
for leader, partitions in leader2partitions.items():
1187+
topic2partitions = defaultdict(list)
1188+
for partition in partitions:
1189+
topic2partitions[partition.topic].append(partition)
1190+
1191+
request = DeleteRecordsRequest[version](
1192+
topics=[
1193+
(topic, [(tp.partition, records_to_delete[tp]) for tp in partitions])
1194+
for topic, partitions in topic2partitions.items()
1195+
],
1196+
timeout_ms=timeout_ms
1197+
)
1198+
future = self._send_request_to_node(leader, request)
1199+
self._wait_for_futures([future])
1200+
1201+
responses.append(future.value.to_object())
1202+
1203+
partition2result = {}
1204+
partition2error = {}
1205+
for response in responses:
1206+
for topic in response["topics"]:
1207+
for partition in topic["partitions"]:
1208+
tp = TopicPartition(topic["name"], partition["partition_index"])
1209+
partition2result[tp] = partition
1210+
if partition["error_code"] != 0:
1211+
partition2error[tp] = partition["error_code"]
1212+
1213+
if partition2error:
1214+
if len(partition2error) == 1:
1215+
key, error = next(iter(partition2error.items()))
1216+
raise Errors.for_code(error)(
1217+
"Error deleting records from topic %s partition %s" % (key.topic, key.partition)
1218+
)
1219+
else:
1220+
raise Errors.BrokerResponseError(
1221+
"The following errors occured when trying to delete records: " +
1222+
", ".join(
1223+
"%s(partition=%d): %s" %
1224+
(partition.topic, partition.partition, Errors.for_code(error).__name__)
1225+
for partition, error in partition2error.items()
1226+
)
1227+
)
1228+
1229+
return partition2result
11211230

11221231
# create delegation token protocol not yet implemented
11231232
# Note: send the request to the least_loaded_node()

kafka/protocol/admin.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,38 @@ class DeleteTopicsRequest_v3(Request):
179179
]
180180

181181

182+
class DeleteRecordsResponse_v0(Response):
183+
API_KEY = 21
184+
API_VERSION = 0
185+
SCHEMA = Schema(
186+
('throttle_time_ms', Int32),
187+
('topics', Array(
188+
('name', String('utf-8')),
189+
('partitions', Array(
190+
('partition_index', Int32),
191+
('low_watermark', Int64),
192+
('error_code', Int16))))),
193+
)
194+
195+
196+
class DeleteRecordsRequest_v0(Request):
197+
API_KEY = 21
198+
API_VERSION = 0
199+
RESPONSE_TYPE = DeleteRecordsResponse_v0
200+
SCHEMA = Schema(
201+
('topics', Array(
202+
('name', String('utf-8')),
203+
('partitions', Array(
204+
('partition_index', Int32),
205+
('offset', Int64))))),
206+
('timeout_ms', Int32)
207+
)
208+
209+
210+
DeleteRecordsResponse = [DeleteRecordsResponse_v0]
211+
DeleteRecordsRequest = [DeleteRecordsRequest_v0]
212+
213+
182214
class ListGroupsResponse_v0(Response):
183215
API_KEY = 16
184216
API_VERSION = 0

test/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ def kafka_consumer_factory(kafka_broker, topic, request):
7373
"""Return a KafkaConsumer factory fixture"""
7474
_consumer = [None]
7575

76-
def factory(**kafka_consumer_params):
76+
def factory(topics=(topic,), **kafka_consumer_params):
7777
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
7878
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
7979
params.setdefault('auto_offset_reset', 'earliest')
80-
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
80+
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params))
8181
return _consumer[0]
8282

8383
yield factory

test/test_admin_integration.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from kafka.structs import TopicPartition
12
import pytest
23

34
from logging import info
@@ -7,7 +8,9 @@
78

89
from kafka.admin import (
910
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
10-
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)
11+
from kafka.errors import (
12+
BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
13+
GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError)
1114

1215

1316
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -315,3 +318,71 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa
315318
assert group1 not in consumergroups
316319
assert group2 in consumergroups
317320
assert group3 not in consumergroups
321+
322+
@pytest.fixture(name="topic2")
323+
def _topic2(kafka_broker, request):
324+
"""Same as `topic` fixture, but a different name if you need to topics."""
325+
topic_name = '%s_%s' % (request.node.name, random_string(10))
326+
kafka_broker.create_topics([topic_name])
327+
return topic_name
328+
329+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")
330+
def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2):
331+
t0p0 = TopicPartition(topic, 0)
332+
t0p1 = TopicPartition(topic, 1)
333+
t0p2 = TopicPartition(topic, 2)
334+
t1p0 = TopicPartition(topic2, 0)
335+
t1p1 = TopicPartition(topic2, 1)
336+
t1p2 = TopicPartition(topic2, 2)
337+
338+
partitions = (t0p0, t0p1, t0p2, t1p0, t1p1, t1p2)
339+
340+
for p in partitions:
341+
send_messages(range(0, 100), partition=p.partition, topic=p.topic)
342+
343+
consumer1 = kafka_consumer_factory(group_id=None, topics=())
344+
consumer1.assign(partitions)
345+
for _ in range(600):
346+
next(consumer1)
347+
348+
result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000)
349+
assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition}
350+
assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition}
351+
assert result[t1p0] == {"low_watermark": 40, "error_code": 0, "partition_index": t1p0.partition}
352+
assert result[t1p2] == {"low_watermark": 30, "error_code": 0, "partition_index": t1p2.partition}
353+
354+
consumer2 = kafka_consumer_factory(group_id=None, topics=())
355+
consumer2.assign(partitions)
356+
all_messages = consumer2.poll(max_records=600, timeout_ms=2000)
357+
assert sum(len(x) for x in all_messages.values()) == 600 - 100 - 50 - 40 - 30
358+
assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure there are no delayed messages
359+
360+
assert not all_messages.get(t0p0, [])
361+
assert [r.offset for r in all_messages[t0p1]] == list(range(50, 100))
362+
assert [r.offset for r in all_messages[t0p2]] == list(range(100))
363+
364+
assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100))
365+
assert [r.offset for r in all_messages[t1p1]] == list(range(100))
366+
assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100))
367+
368+
369+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")
370+
def test_delete_records_with_errors(kafka_admin_client, topic, send_messages):
371+
sleep(1) # sometimes the topic is not created yet...?
372+
p0 = TopicPartition(topic, 0)
373+
p1 = TopicPartition(topic, 1)
374+
p2 = TopicPartition(topic, 2)
375+
# verify that topic has been created
376+
send_messages(range(0, 1), partition=p2.partition, topic=p2.topic)
377+
378+
with pytest.raises(UnknownTopicOrPartitionError):
379+
kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1})
380+
with pytest.raises(UnknownTopicOrPartitionError):
381+
kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1})
382+
with pytest.raises(OffsetOutOfRangeError):
383+
kafka_admin_client.delete_records({p0: 1000})
384+
with pytest.raises(BrokerResponseError):
385+
kafka_admin_client.delete_records({p0: 1000, p1: 1000})
386+
387+
388+

0 commit comments

Comments
 (0)