Skip to content

Commit f9e0264

Browse files
author
Tincu Gabriel
authored
Add log_start_offset to message protocol parsing (#2020)
This is in preparation for adding `zstd` support.
1 parent 5d4b3ec commit f9e0264

File tree

4 files changed

+107
-15
lines changed

4 files changed

+107
-15
lines changed

kafka/producer/future.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri
3838
produce_future.add_errback(self.failure)
3939

4040
def _produce_success(self, offset_and_timestamp):
41-
offset, produce_timestamp_ms = offset_and_timestamp
41+
offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp
4242

4343
# Unpacking from args tuple is minor speed optimization
4444
(relative_offset, timestamp_ms, checksum,
@@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp):
5151
if offset != -1 and relative_offset is not None:
5252
offset += relative_offset
5353
tp = self._produce_future.topic_partition
54-
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
54+
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset,
5555
checksum, serialized_key_size,
5656
serialized_value_size, serialized_header_size)
5757
self.success(metadata)
@@ -67,5 +67,5 @@ def get(self, timeout=None):
6767

6868

6969
RecordMetadata = collections.namedtuple(
70-
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
70+
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset',
7171
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])

kafka/producer/record_accumulator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,16 @@ def try_append(self, timestamp_ms, key, value, headers):
6868
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
6969
return future
7070

71-
def done(self, base_offset=None, timestamp_ms=None, exception=None):
71+
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
7272
level = logging.DEBUG if exception is None else logging.WARNING
7373
log.log(level, "Produced messages to topic-partition %s with base offset"
74-
" %s and error %s.", self.topic_partition, base_offset,
75-
exception) # trace
74+
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
75+
log_start_offset, global_error) # trace
7676
if self.produce_future.is_done:
7777
log.warning('Batch is already closed -- ignoring batch.done()')
7878
return
7979
elif exception is None:
80-
self.produce_future.success((base_offset, timestamp_ms))
80+
self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
8181
else:
8282
self.produce_future.failure(exception)
8383

kafka/producer/sender.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,22 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
195195

196196
for topic, partitions in response.topics:
197197
for partition_info in partitions:
198+
global_error = None
199+
log_start_offset = None
198200
if response.API_VERSION < 2:
199201
partition, error_code, offset = partition_info
200202
ts = None
201-
else:
203+
elif 2 <= response.API_VERSION <= 4:
202204
partition, error_code, offset, ts = partition_info
205+
elif 5 <= response.API_VERSION <= 7:
206+
partition, error_code, offset, ts, log_start_offset = partition_info
207+
else:
208+
# the ignored parameter is record_error of type list[(batch_index: int, error_message: str)]
209+
partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
203210
tp = TopicPartition(topic, partition)
204211
error = Errors.for_code(error_code)
205212
batch = batches_by_partition[tp]
206-
self._complete_batch(batch, error, offset, ts)
213+
self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)
207214

208215
if response.API_VERSION > 0:
209216
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
@@ -213,14 +220,16 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
213220
for batch in batches:
214221
self._complete_batch(batch, None, -1, None)
215222

216-
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
223+
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
217224
"""Complete or retry the given batch of records.
218225
219226
Arguments:
220227
batch (RecordBatch): The record batch
221228
error (Exception): The error (or None if none)
222229
base_offset (int): The base offset assigned to the records if successful
223230
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
231+
log_start_offset (int): The start offset of the log at the time this produce response was created
232+
global_error (str): The summarising error message
224233
"""
225234
# Standardize no-error to None
226235
if error is Errors.NoError:
@@ -232,15 +241,15 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
232241
" retrying (%d attempts left). Error: %s",
233242
batch.topic_partition,
234243
self.config['retries'] - batch.attempts - 1,
235-
error)
244+
global_error or error)
236245
self._accumulator.reenqueue(batch)
237246
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
238247
else:
239248
if error is Errors.TopicAuthorizationFailedError:
240249
error = error(batch.topic_partition.topic)
241250

242251
# tell the user the result of their request
243-
batch.done(base_offset, timestamp_ms, error)
252+
batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
244253
self._accumulator.deallocate(batch)
245254
if error is not None:
246255
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
@@ -293,7 +302,15 @@ def _produce_request(self, node_id, acks, timeout, batches):
293302
produce_records_by_partition[topic][partition] = buf
294303

295304
kwargs = {}
296-
if self.config['api_version'] >= (0, 11):
305+
if self.config['api_version'] >= (2, 1):
306+
version = 7
307+
elif self.config['api_version'] >= (2, 0):
308+
version = 6
309+
elif self.config['api_version'] >= (1, 1):
310+
version = 5
311+
elif self.config['api_version'] >= (1, 0):
312+
version = 4
313+
elif self.config['api_version'] >= (0, 11):
297314
version = 3
298315
kwargs = dict(transactional_id=None)
299316
elif self.config['api_version'] >= (0, 10):

kafka/protocol/produce.py

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,50 @@ class ProduceResponse_v5(Response):
7878
)
7979

8080

81+
class ProduceResponse_v6(Response):
82+
"""
83+
The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
84+
"""
85+
API_KEY = 0
86+
API_VERSION = 6
87+
SCHEMA = ProduceResponse_v5.SCHEMA
88+
89+
90+
class ProduceResponse_v7(Response):
91+
"""
92+
V7 bumped up to indicate ZStandard capability. (see KIP-110)
93+
"""
94+
API_KEY = 0
95+
API_VERSION = 7
96+
SCHEMA = ProduceResponse_v6.SCHEMA
97+
98+
99+
class ProduceResponse_v8(Response):
100+
"""
101+
V8 bumped up to add two new fields record_errors offset list and error_message
102+
(See KIP-467)
103+
"""
104+
API_KEY = 0
105+
API_VERSION = 8
106+
SCHEMA = Schema(
107+
('topics', Array(
108+
('topic', String('utf-8')),
109+
('partitions', Array(
110+
('partition', Int32),
111+
('error_code', Int16),
112+
('offset', Int64),
113+
('timestamp', Int64),
114+
('log_start_offset', Int64)),
115+
('record_errors', (Array(
116+
('batch_index', Int32),
117+
('batch_index_error_message', String('utf-8'))
118+
))),
119+
('error_message', String('utf-8'))
120+
))),
121+
('throttle_time_ms', Int32)
122+
)
123+
124+
81125
class ProduceRequest(Request):
82126
API_KEY = 0
83127

@@ -106,6 +150,7 @@ class ProduceRequest_v1(ProduceRequest):
106150
RESPONSE_TYPE = ProduceResponse_v1
107151
SCHEMA = ProduceRequest_v0.SCHEMA
108152

153+
109154
class ProduceRequest_v2(ProduceRequest):
110155
API_VERSION = 2
111156
RESPONSE_TYPE = ProduceResponse_v2
@@ -147,11 +192,41 @@ class ProduceRequest_v5(ProduceRequest):
147192
SCHEMA = ProduceRequest_v4.SCHEMA
148193

149194

195+
class ProduceRequest_v6(ProduceRequest):
196+
"""
197+
The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
198+
"""
199+
API_VERSION = 6
200+
RESPONSE_TYPE = ProduceResponse_v6
201+
SCHEMA = ProduceRequest_v5.SCHEMA
202+
203+
204+
class ProduceRequest_v7(ProduceRequest):
205+
"""
206+
V7 bumped up to indicate ZStandard capability. (see KIP-110)
207+
"""
208+
API_VERSION = 7
209+
RESPONSE_TYPE = ProduceResponse_v7
210+
SCHEMA = ProduceRequest_v6.SCHEMA
211+
212+
213+
class ProduceRequest_v8(ProduceRequest):
214+
"""
215+
V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse
216+
(See KIP-467)
217+
"""
218+
API_VERSION = 8
219+
RESPONSE_TYPE = ProduceResponse_v8
220+
SCHEMA = ProduceRequest_v7.SCHEMA
221+
222+
150223
ProduceRequest = [
151224
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
152-
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
225+
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5,
226+
ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8,
153227
]
154228
ProduceResponse = [
155229
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
156-
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
230+
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5,
231+
ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8,
157232
]

0 commit comments

Comments
 (0)