@@ -134,18 +134,30 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker):
134
134
135
135
136
136
def test__reset_offsets_async (fetcher , mocker ):
137
- tp = TopicPartition ("topic" , 0 )
137
+ tp0 = TopicPartition ("topic" , 0 )
138
+ tp1 = TopicPartition ("topic" , 1 )
138
139
fetcher ._subscriptions .subscribe (topics = ["topic" ])
139
- fetcher ._subscriptions .assign_from_subscribed ([tp ])
140
- fetcher ._subscriptions .request_offset_reset (tp )
141
- fetched_offsets = {tp : OffsetAndTimestamp (1001 , None , - 1 )}
140
+ fetcher ._subscriptions .assign_from_subscribed ([tp0 , tp1 ])
141
+ fetcher ._subscriptions .request_offset_reset (tp0 )
142
+ fetcher ._subscriptions .request_offset_reset (tp1 )
143
+ mocker .patch .object (fetcher ._client .cluster , "leader_for_partition" , side_effect = [0 , 1 ])
142
144
mocker .patch .object (fetcher ._client , 'ready' , return_value = True )
143
- mocker .patch .object (fetcher , '_send_list_offsets_request' ,
144
- return_value = Future ().success ((fetched_offsets , set ())))
145
- mocker .patch .object (fetcher ._client .cluster , "leader_for_partition" , return_value = 0 )
146
- fetcher ._reset_offsets_async ({tp : OffsetResetStrategy .EARLIEST })
147
- assert not fetcher ._subscriptions .assignment [tp ].awaiting_reset
148
- assert fetcher ._subscriptions .assignment [tp ].position .offset == 1001
145
+ mocker .patch .object (
146
+ fetcher ,
147
+ '_send_list_offsets_request' ,
148
+ side_effect = [
149
+ Future ().success (({tp0 : OffsetAndTimestamp (1001 , None , - 1 )}, set ())),
150
+ Future ().success (({tp1 : OffsetAndTimestamp (1002 , None , - 1 )}, set ())),
151
+ ]
152
+ )
153
+ fetcher ._reset_offsets_async ({
154
+ tp0 : OffsetResetStrategy .EARLIEST ,
155
+ tp1 : OffsetResetStrategy .EARLIEST ,
156
+ })
157
+ assert not fetcher ._subscriptions .assignment [tp0 ].awaiting_reset
158
+ assert not fetcher ._subscriptions .assignment [tp1 ].awaiting_reset
159
+ assert fetcher ._subscriptions .assignment [tp0 ].position .offset == 1001
160
+ assert fetcher ._subscriptions .assignment [tp1 ].position .offset == 1002
149
161
150
162
151
163
def test__send_list_offsets_requests (fetcher , mocker ):
0 commit comments