@@ -79,8 +79,17 @@ def test_send_fetches(fetcher, topic, mocker):
79
79
])])
80
80
]
81
81
82
- mocker .patch .object (fetcher , '_create_fetch_requests' ,
83
- return_value = dict (enumerate (fetch_requests )))
82
+ def build_fetch_offsets (request ):
83
+ fetch_offsets = {}
84
+ for topic , partitions in request .topics :
85
+ for partition_data in partitions :
86
+ partition , offset = partition_data [:2 ]
87
+ fetch_offsets [TopicPartition (topic , partition )] = offset
88
+ return fetch_offsets
89
+
90
+ mocker .patch .object (
91
+ fetcher , '_create_fetch_requests' ,
92
+ return_value = (dict (enumerate (map (lambda r : (r , build_fetch_offsets (r )), fetch_requests )))))
84
93
85
94
mocker .patch .object (fetcher ._client , 'ready' , return_value = True )
86
95
mocker .patch .object (fetcher ._client , 'send' )
@@ -100,8 +109,8 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
100
109
fetcher ._client ._api_versions = BROKER_API_VERSIONS [api_version ]
101
110
mocker .patch .object (fetcher ._client .cluster , "leader_for_partition" , return_value = 0 )
102
111
by_node = fetcher ._create_fetch_requests ()
103
- requests = by_node .values ()
104
- assert set ([r .API_VERSION for r in requests ]) == set ([fetch_version ])
112
+ requests_and_offsets = by_node .values ()
113
+ assert set ([r .API_VERSION for ( r , _offsets ) in requests_and_offsets ]) == set ([fetch_version ])
105
114
106
115
107
116
def test_update_fetch_positions (fetcher , topic , mocker ):
@@ -345,19 +354,15 @@ def test_fetched_records(fetcher, topic, mocker):
345
354
assert partial is False
346
355
347
356
348
- @pytest .mark .parametrize (("fetch_request " , "fetch_response" , "num_partitions" ), [
357
+ @pytest .mark .parametrize (("fetch_offsets " , "fetch_response" , "num_partitions" ), [
349
358
(
350
- FetchRequest [0 ](
351
- - 1 , 100 , 100 ,
352
- [('foo' , [(0 , 0 , 1000 ),])]),
359
+ {TopicPartition ('foo' , 0 ): 0 },
353
360
FetchResponse [0 ](
354
361
[("foo" , [(0 , 0 , 1000 , [(0 , b'xxx' ),])]),]),
355
362
1 ,
356
363
),
357
364
(
358
- FetchRequest [1 ](
359
- - 1 , 100 , 100 ,
360
- [('foo' , [(0 , 0 , 1000 ), (1 , 0 , 1000 ),])]),
365
+ {TopicPartition ('foo' , 0 ): 0 , TopicPartition ('foo' , 1 ): 0 },
361
366
FetchResponse [1 ](
362
367
0 ,
363
368
[("foo" , [
@@ -367,41 +372,33 @@ def test_fetched_records(fetcher, topic, mocker):
367
372
2 ,
368
373
),
369
374
(
370
- FetchRequest [2 ](
371
- - 1 , 100 , 100 ,
372
- [('foo' , [(0 , 0 , 1000 ),])]),
375
+ {TopicPartition ('foo' , 0 ): 0 },
373
376
FetchResponse [2 ](
374
377
0 , [("foo" , [(0 , 0 , 1000 , [(0 , b'xxx' ),])]),]),
375
378
1 ,
376
379
),
377
380
(
378
- FetchRequest [3 ](
379
- - 1 , 100 , 100 , 10000 ,
380
- [('foo' , [(0 , 0 , 1000 ),])]),
381
+ {TopicPartition ('foo' , 0 ): 0 },
381
382
FetchResponse [3 ](
382
383
0 , [("foo" , [(0 , 0 , 1000 , [(0 , b'xxx' ),])]),]),
383
384
1 ,
384
385
),
385
386
(
386
- FetchRequest [4 ](
387
- - 1 , 100 , 100 , 10000 , 0 ,
388
- [('foo' , [(0 , 0 , 1000 ),])]),
387
+ {TopicPartition ('foo' , 0 ): 0 },
389
388
FetchResponse [4 ](
390
389
0 , [("foo" , [(0 , 0 , 1000 , 0 , [], [(0 , b'xxx' ),])]),]),
391
390
1 ,
392
391
),
393
392
(
394
393
# This may only be used in broker-broker api calls
395
- FetchRequest [5 ](
396
- - 1 , 100 , 100 , 10000 , 0 ,
397
- [('foo' , [(0 , 0 , 1000 ),])]),
394
+ {TopicPartition ('foo' , 0 ): 0 },
398
395
FetchResponse [5 ](
399
396
0 , [("foo" , [(0 , 0 , 1000 , 0 , 0 , [], [(0 , b'xxx' ),])]),]),
400
397
1 ,
401
398
),
402
399
])
403
- def test__handle_fetch_response (fetcher , fetch_request , fetch_response , num_partitions ):
404
- fetcher ._handle_fetch_response (fetch_request , time .time (), fetch_response )
400
+ def test__handle_fetch_response (fetcher , fetch_offsets , fetch_response , num_partitions ):
401
+ fetcher ._handle_fetch_response (0 , fetch_offsets , time .time (), fetch_response )
405
402
assert len (fetcher ._completed_fetches ) == num_partitions
406
403
407
404
0 commit comments