Skip to content

Commit 66069b9

Browse files
committed
Test _reset_offsets_async with multiple partitions => KeyError
1 parent 827832a commit 66069b9

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

test/test_fetcher.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,18 +134,27 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker):
134134

135135

136136
def test__reset_offsets_async(fetcher, mocker):
137-
tp = TopicPartition("topic", 0)
137+
tp0 = TopicPartition("topic", 0)
138+
tp1 = TopicPartition("topic", 1)
138139
fetcher._subscriptions.subscribe(topics=["topic"])
139-
fetcher._subscriptions.assign_from_subscribed([tp])
140-
fetcher._subscriptions.request_offset_reset(tp)
141-
fetched_offsets = {tp: OffsetAndTimestamp(1001, None, -1)}
140+
fetcher._subscriptions.assign_from_subscribed([tp0, tp1])
141+
fetcher._subscriptions.request_offset_reset(tp0)
142+
fetcher._subscriptions.request_offset_reset(tp1)
143+
mocker.patch.object(fetcher._client.cluster, "leader_for_partition", side_effect=[0, 1])
142144
mocker.patch.object(fetcher._client, 'ready', return_value=True)
143-
mocker.patch.object(fetcher, '_send_list_offsets_request',
144-
return_value=Future().success((fetched_offsets, set())))
145-
mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0)
146-
fetcher._reset_offsets_async({tp: OffsetResetStrategy.EARLIEST})
147-
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
148-
assert fetcher._subscriptions.assignment[tp].position.offset == 1001
145+
future1 = Future()
146+
future2 = Future()
147+
mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=[future1, future2])
148+
fetcher._reset_offsets_async({
149+
tp0: OffsetResetStrategy.EARLIEST,
150+
tp1: OffsetResetStrategy.EARLIEST,
151+
})
152+
future1.success(({tp0: OffsetAndTimestamp(1001, None, -1)}, set())),
153+
future2.success(({tp1: OffsetAndTimestamp(1002, None, -1)}, set())),
154+
assert not fetcher._subscriptions.assignment[tp0].awaiting_reset
155+
assert not fetcher._subscriptions.assignment[tp1].awaiting_reset
156+
assert fetcher._subscriptions.assignment[tp0].position.offset == 1001
157+
assert fetcher._subscriptions.assignment[tp1].position.offset == 1002
149158

150159

151160
def test__send_list_offsets_requests(fetcher, mocker):

0 commit comments

Comments
 (0)