Skip to content

Commit d47055c

Browse files
committed
metadata string
1 parent 7473305 commit d47055c

File tree

6 files changed

+13
-13
lines changed

6 files changed

+13
-13
lines changed

kafka/consumer/fetcher.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ def _append(self, drained, part, max_records, update_offsets):
385385
drained[tp].append(record)
386386

387387
if update_offsets:
388-
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, b'', leader_epoch)
388+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', leader_epoch)
389389
return len(part_records)
390390

391391
else:
@@ -449,7 +449,7 @@ def _message_generator(self):
449449
self._subscriptions.assignment[tp].position.offset)
450450
continue
451451

452-
self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, b'', -1)
452+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, '', -1)
453453
yield msg
454454

455455
self._next_partition_records = None
@@ -705,7 +705,7 @@ def _create_fetch_requests(self):
705705
"Advance position for partition %s from %s to %s (last record batch location plus one)"
706706
" to correct for deleted compacted messages and/or transactional control records",
707707
partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header)
708-
self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, b'', -1)
708+
self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, '', -1)
709709

710710
position = self._subscriptions.assignment[partition].position
711711

kafka/consumer/group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ def _message_generator_v2(self):
11441144
log.debug("Not returning fetched records for partition %s"
11451145
" since it is no longer fetchable", tp)
11461146
break
1147-
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, b'', -1)
1147+
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1)
11481148
yield record
11491149

11501150
def _message_generator(self):

kafka/consumer/subscription_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ def await_reset(self, strategy):
404404
self.has_valid_position = False
405405

406406
def seek(self, offset):
407-
self._position = OffsetAndMetadata(offset, b'', -1)
407+
self._position = OffsetAndMetadata(offset, '', -1)
408408
self.awaiting_reset = False
409409
self.reset_strategy = None
410410
self.has_valid_position = True

kafka/structs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@
4848
"""The Kafka offset commit API
4949
5050
The Kafka offset commit API allows users to provide additional metadata
51-
(in the form of raw bytes) when an offset is committed. This can be useful
51+
(in the form of a string) when an offset is committed. This can be useful
5252
(for example) to store information about which node made the commit,
5353
what time the commit was made, etc.
5454
5555
Keyword Arguments:
5656
offset (int): The offset to be committed
57-
metadata (bytes): Non-null metadata
57+
metadata (str): Non-null metadata
5858
leader_epoch (int): The last known epoch from the leader / broker
5959
"""
6060
OffsetAndMetadata = namedtuple("OffsetAndMetadata",

test/test_coordinator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,13 @@ def test_need_rejoin(coordinator):
230230
def test_refresh_committed_offsets_if_needed(mocker, coordinator):
231231
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
232232
return_value = {
233-
TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1),
234-
TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1)})
233+
TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1),
234+
TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1)})
235235
coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)])
236236
assert coordinator._subscription.needs_fetch_committed_offsets is True
237237
coordinator.refresh_committed_offsets_if_needed()
238238
assignment = coordinator._subscription.assignment
239-
assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'', -1)
239+
assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, '', -1)
240240
assert TopicPartition('foobar', 1) not in assignment
241241
assert coordinator._subscription.needs_fetch_committed_offsets is False
242242

@@ -303,8 +303,8 @@ def test_close(mocker, coordinator):
303303
@pytest.fixture
304304
def offsets():
305305
return {
306-
TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1),
307-
TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1),
306+
TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1),
307+
TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1),
308308
}
309309

310310

test/test_fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
138138
fetcher._reset_offset.reset_mock()
139139
fetcher._subscriptions.need_offset_reset(partition)
140140
fetcher._subscriptions.assignment[partition].awaiting_reset = False
141-
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'', -1)
141+
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, '', -1)
142142
mocker.patch.object(fetcher._subscriptions, 'seek')
143143
fetcher.update_fetch_positions([partition])
144144
assert fetcher._reset_offset.call_count == 0

0 commit comments

Comments
 (0)