3
3
import logging
4
4
import time
5
5
from threading import Lock
6
- from multiprocessing import Process , Queue , Event , Value
7
- from Queue import Empty
6
+ from multiprocessing import Process , Queue as MPQueue , Event , Value
7
+ from Queue import Empty , Queue
8
8
9
9
from kafka .common import (
10
10
ErrorMapping , FetchRequest ,
@@ -227,6 +227,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
227
227
self .fetch_max_wait_time = FETCH_MAX_WAIT_TIME
228
228
self .fetch_min_bytes = fetch_size_bytes
229
229
self .fetch_started = defaultdict (bool ) # defaults to false
230
+ self .queue = Queue (buffer_size )
230
231
231
232
super (SimpleConsumer , self ).__init__ (
232
233
client , group , topic ,
@@ -292,122 +293,75 @@ def get_messages(self, count=1, block=True, timeout=0.1):
292
293
293
294
count: Indicates the maximum number of messages to be fetched
294
295
block: If True, the API will block till some messages are fetched.
295
- timeout: If None, and block= True, the API will block infinitely.
296
- If >0, API will block for specified time (in seconds)
296
+ timeout: If block is True, the function will block for the specified time (in seconds)
297
+ until count messages is fetched. If None, it will block forever.
297
298
"""
298
299
messages = []
299
- iterator = self .__iter__ ()
300
-
301
- # HACK: This splits the timeout between available partitions
302
300
if timeout :
303
- timeout = timeout * 1.0 / len ( self . offsets )
301
+ max_time = time . time () + timeout
304
302
305
- with FetchContext (self , block , timeout ):
306
- while count > 0 :
307
- try :
308
- messages .append (next (iterator ))
309
- except StopIteration :
310
- break
303
+ while count > 0 and (timeout is None or timeout > 0 ):
304
+ message = self .get_message (block , timeout )
305
+ if message :
306
+ messages .append (message )
311
307
count -= 1
308
+ else :
309
+ # Ran out of messages for the last request. If we're not blocking, break.
310
+ if not block :
311
+ break
312
+ if timeout :
313
+ timeout = max_time - time .time ()
312
314
313
315
return messages
314
316
315
- def __iter__ (self ):
316
- """
317
- Create an iterate per partition. Iterate through them calling next()
318
- until they are all exhausted.
319
- """
320
- iters = {}
321
- for partition , offset in self .offsets .items ():
322
- iters [partition ] = self .__iter_partition__ (partition , offset )
323
-
324
- if len (iters ) == 0 :
325
- return
326
-
327
- while True :
328
- if len (iters ) == 0 :
329
- break
330
-
331
- for partition , it in iters .items ():
332
- try :
333
- if self .partition_info :
334
- yield (partition , it .next ())
335
- else :
336
- yield it .next ()
337
- except StopIteration :
338
- log .debug ("Done iterating over partition %s" % partition )
339
- del iters [partition ]
340
-
341
- # skip auto-commit since we didn't yield anything
342
- continue
343
-
344
- # Count, check and commit messages if necessary
345
- self .count_since_commit += 1
346
- self ._auto_commit ()
347
-
348
- def __iter_partition__ (self , partition , offset ):
349
- """
350
- Iterate over the messages in a partition. Create a FetchRequest
351
- to get back a batch of messages, yield them one at a time.
352
- After a batch is exhausted, start a new batch unless we've reached
353
- the end of this partition.
354
- """
355
-
356
- # The offset that is stored in the consumer is the offset that
357
- # we have consumed. In subsequent iterations, we are supposed to
358
- # fetch the next message (that is from the next offset)
359
- # However, for the 0th message, the offset should be as-is.
360
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
361
- # problematic, since 0 is offset of a message which we have not yet
362
- # consumed.
363
- if self .fetch_started [partition ]:
364
- offset += 1
365
-
366
- fetch_size = self .fetch_min_bytes
317
+ def get_message (self , block = True , timeout = 0.1 ):
318
+ if self .queue .empty ():
319
+ with FetchContext (self , block , timeout ):
320
+ self ._fetch ()
321
+ try :
322
+ return self .queue .get_nowait ()
323
+ except Empty :
324
+ return None
367
325
326
+ def __iter__ (self ):
368
327
while True :
369
- # use MaxBytes = client's bufsize since we're only
370
- # fetching one topic + partition
371
- req = FetchRequest (
372
- self .topic , partition , offset , self .buffer_size )
373
-
374
- (resp ,) = self .client .send_fetch_request (
375
- [req ],
376
- max_wait_time = self .fetch_max_wait_time ,
377
- min_bytes = fetch_size )
378
-
379
- assert resp .topic == self .topic
380
- assert resp .partition == partition
328
+ message = self .get_message (True , 100 )
329
+ if message :
330
+ yield message
331
+ else :
332
+ # In case we did not receive any message, give up the CPU for
333
+ # a while before we try again
334
+ time .sleep (0.1 )
381
335
382
- next_offset = None
336
+ def _fetch (self ):
337
+ requests = []
338
+ partitions = self .offsets .keys ()
339
+ for partition in partitions :
340
+ requests .append (FetchRequest (self .topic , partition , self .offsets [partition ], self .buffer_size ))
341
+ responses = self .client .send_fetch_request (
342
+ requests ,
343
+ max_wait_time = int (self .fetch_max_wait_time ),
344
+ min_bytes = self .fetch_min_bytes )
345
+ for resp in responses :
346
+ partition = resp .partition
383
347
try :
384
348
for message in resp .messages :
385
- next_offset = message .offset
386
-
387
- # update the offset before the message is yielded. This
388
- # is so that the consumer state is not lost in certain
389
- # cases.
390
- #
391
- # For eg: the message is yielded and consumed by the
392
- # caller, but the caller does not come back into the
393
- # generator again. The message will be consumed but the
394
- # status will not be updated in the consumer
395
- self .fetch_started [partition ] = True
396
- self .offsets [partition ] = message .offset
397
- yield message
349
+ self .offsets [partition ] = message .offset + 1
350
+ # Count, check and commit messages if necessary
351
+ self .count_since_commit += 1
352
+ self ._auto_commit ()
353
+ if self .partition_info :
354
+ self .queue .put ((partition , message ))
355
+ else :
356
+ self .queue .put (message )
398
357
except ConsumerFetchSizeTooSmall , e :
399
- fetch_size *= 1.5
400
- log .warn (
401
- "Fetch size too small, increasing to %d (1.5x) and retry" ,
402
- fetch_size )
403
- continue
358
+ self .buffer_size *= 2
359
+ log .warn ("Fetch size too small, increasing to %d (2x) and retry" , self .buffer_size )
404
360
except ConsumerNoMoreData , e :
405
361
log .debug ("Iteration was ended by %r" , e )
406
-
407
- if next_offset is None :
408
- break
409
- else :
410
- offset = next_offset + 1
362
+ except StopIteration :
363
+ # Stop iterating through this partition
364
+ log .debug ("Done iterating over partition %s" % partition )
411
365
412
366
413
367
def _mp_consume (client , group , topic , chunk , queue , start , exit , pause , size ):
@@ -446,8 +400,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
446
400
# indicates a specific number of messages, follow that advice
447
401
count = 0
448
402
449
- for partition , message in consumer :
450
- queue .put ((partition , message ))
403
+ message = consumer .get_message ()
404
+ if message :
405
+ queue .put (message )
451
406
count += 1
452
407
453
408
# We have reached the required size. The controller might have
@@ -457,11 +412,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
457
412
# can reset the 'start' event
458
413
if count == size .value :
459
414
pause .wait ()
460
- break
461
415
462
- # In case we did not receive any message, give up the CPU for
463
- # a while before we try again
464
- if count == 0 :
416
+ else :
417
+ # In case we did not receive any message, give up the CPU for
418
+ # a while before we try again
465
419
time .sleep (0.1 )
466
420
467
421
consumer .stop ()
@@ -507,7 +461,7 @@ def __init__(self, client, group, topic, auto_commit=True,
507
461
508
462
# Variables for managing and controlling the data flow from
509
463
# consumer child process to master
510
- self .queue = Queue (1024 ) # Child consumers dump messages into this
464
+ self .queue = MPQueue (1024 ) # Child consumers dump messages into this
511
465
self .start = Event () # Indicates the consumers to start fetch
512
466
self .exit = Event () # Requests the consumers to shutdown
513
467
self .pause = Event () # Requests the consumers to pause fetch
@@ -589,8 +543,8 @@ def get_messages(self, count=1, block=True, timeout=10):
589
543
590
544
count: Indicates the maximum number of messages to be fetched
591
545
block: If True, the API will block till some messages are fetched.
592
- timeout: If None, and block= True, the API will block infinitely.
593
- If >0, API will block for specified time (in seconds)
546
+ timeout: If block is True, the function will block for the specified time (in seconds)
547
+ until count messages is fetched. If None, it will block forever.
594
548
"""
595
549
messages = []
596
550
@@ -601,7 +555,10 @@ def get_messages(self, count=1, block=True, timeout=10):
601
555
self .size .value = count
602
556
self .pause .clear ()
603
557
604
- while count > 0 :
558
+ if timeout :
559
+ max_time = time .time () + timeout
560
+
561
+ while count > 0 and (timeout is None or timeout > 0 ):
605
562
# Trigger consumption only if the queue is empty
606
563
# By doing this, we will ensure that consumers do not
607
564
# go into overdrive and keep consuming thousands of
@@ -621,6 +578,7 @@ def get_messages(self, count=1, block=True, timeout=10):
621
578
self .count_since_commit += 1
622
579
self ._auto_commit ()
623
580
count -= 1
581
+ timeout = max_time - time .time ()
624
582
625
583
self .size .value = 0
626
584
self .start .clear ()
0 commit comments