@@ -242,6 +242,7 @@ def test_produce_consume_two_partitions(self):
242
242
# Offset Tests #
243
243
####################
244
244
245
+ @unittest .skip ('commmit offset not supported in this version' )
245
246
def test_commit_fetch_offsets (self ):
246
247
req = OffsetCommitRequest ("test_commit_fetch_offsets" , 0 , 42 , "metadata" )
247
248
(resp ,) = self .client .send_offset_commit_request ("group" , [req ])
@@ -401,8 +402,9 @@ def test_acks_local_write(self):
401
402
producer .stop ()
402
403
403
404
def test_acks_cluster_commit (self ):
404
- producer = SimpleProducer (self .client , "test_acks_cluster_commit" ,
405
- req_acks = SimpleProducer .ACK_AFTER_CLUSTER_COMMIT )
405
+ producer = SimpleProducer (
406
+ self .client , "test_acks_cluster_commit" ,
407
+ req_acks = SimpleProducer .ACK_AFTER_CLUSTER_COMMIT )
406
408
resp = producer .send_messages ("one" )
407
409
self .assertEquals (len (resp ), 1 )
408
410
@@ -548,11 +550,11 @@ def test_batched_simple_producer(self):
548
550
549
551
class TestConsumer (unittest .TestCase ):
550
552
@classmethod
551
- def setUpClass (cls ): # noqa
553
+ def setUpClass (cls ):
552
554
cls .zk = ZookeeperFixture .instance ()
553
555
cls .server1 = KafkaFixture .instance (0 , cls .zk .host , cls .zk .port )
554
556
cls .server2 = KafkaFixture .instance (1 , cls .zk .host , cls .zk .port )
555
- cls .client = KafkaClient (cls .server2 .host , cls .server2 .port )
557
+ cls .client = KafkaClient (cls .server2 .host , cls .server2 .port , bufsize = 8192 )
556
558
557
559
@classmethod
558
560
def tearDownClass (cls ): # noqa
@@ -581,7 +583,7 @@ def test_simple_consumer(self):
581
583
self .assertEquals (resp .offset , 0 )
582
584
583
585
# Start a consumer
584
- consumer = SimpleConsumer (self .client , "group1" , "test_simple_consumer" )
586
+ consumer = SimpleConsumer (self .client , "group1" , "test_simple_consumer" , auto_commit = False )
585
587
all_messages = []
586
588
for message in consumer :
587
589
all_messages .append (message )
@@ -604,6 +606,11 @@ def test_simple_consumer(self):
604
606
605
607
self .assertEquals (len (all_messages ), 13 )
606
608
609
+ consumer .stop ()
610
+
611
+ def test_simple_consumer_blocking (self ):
612
+ consumer = SimpleConsumer (self .client , "group1" , "test_simple_consumer_blocking" , auto_commit = False )
613
+
607
614
# Blocking API
608
615
start = datetime .now ()
609
616
messages = consumer .get_messages (block = True , timeout = 5 )
@@ -612,13 +619,13 @@ def test_simple_consumer(self):
612
619
self .assertEqual (len (messages ), 0 )
613
620
614
621
# Send 10 messages
615
- produce = ProduceRequest ("test_simple_consumer " , 0 , messages = [
622
+ produce = ProduceRequest ("test_simple_consumer_blocking " , 0 , messages = [
616
623
create_message ("Test message 0 %d" % i ) for i in range (10 )
617
624
])
618
625
619
626
for resp in self .client .send_produce_request ([produce ]):
620
627
self .assertEquals (resp .error , 0 )
621
- self .assertEquals (resp .offset , 100 )
628
+ self .assertEquals (resp .offset , 0 )
622
629
623
630
# Fetch 5 messages
624
631
messages = consumer .get_messages (count = 5 , block = True , timeout = 5 )
@@ -650,7 +657,7 @@ def test_simple_consumer_pending(self):
650
657
self .assertEquals (resp .error , 0 )
651
658
self .assertEquals (resp .offset , 0 )
652
659
653
- consumer = SimpleConsumer (self .client , "group1" , "test_simple_pending" )
660
+ consumer = SimpleConsumer (self .client , "group1" , "test_simple_pending" , auto_commit = False )
654
661
self .assertEquals (consumer .pending (), 20 )
655
662
self .assertEquals (consumer .pending (partitions = [0 ]), 10 )
656
663
self .assertEquals (consumer .pending (partitions = [1 ]), 10 )
@@ -676,7 +683,7 @@ def test_multi_process_consumer(self):
676
683
self .assertEquals (resp .offset , 0 )
677
684
678
685
# Start a consumer
679
- consumer = MultiProcessConsumer (self .client , "grp1" , "test_mpconsumer" )
686
+ consumer = MultiProcessConsumer (self .client , "grp1" , "test_mpconsumer" , auto_commit = False )
680
687
all_messages = []
681
688
for message in consumer :
682
689
all_messages .append (message )
@@ -732,7 +739,7 @@ def test_multi_proc_pending(self):
732
739
self .assertEquals (resp .error , 0 )
733
740
self .assertEquals (resp .offset , 0 )
734
741
735
- consumer = MultiProcessConsumer (self .client , "group1" , "test_mppending" )
742
+ consumer = MultiProcessConsumer (self .client , "group1" , "test_mppending" , auto_commit = False )
736
743
self .assertEquals (consumer .pending (), 20 )
737
744
self .assertEquals (consumer .pending (partitions = [0 ]), 10 )
738
745
self .assertEquals (consumer .pending (partitions = [1 ]), 10 )
@@ -749,20 +756,21 @@ def test_large_messages(self):
749
756
self .assertEquals (resp .offset , 0 )
750
757
751
758
# Produce 10 messages that are too large (bigger than default fetch size)
752
- messages2 = [create_message (random_string (5000 )) for i in range (10 )]
759
+ messages2 = [create_message (random_string (5000 )) for i in range (10 )]
753
760
produce2 = ProduceRequest ("test_large_messages" , 0 , messages2 )
754
761
755
762
for resp in self .client .send_produce_request ([produce2 ]):
756
763
self .assertEquals (resp .error , 0 )
757
764
self .assertEquals (resp .offset , 10 )
758
765
759
766
# Consumer should still get all of them
760
- consumer = SimpleConsumer (self .client , "group1" , "test_large_messages" )
767
+ consumer = SimpleConsumer (self .client , "group1" , "test_large_messages" , auto_commit = False )
761
768
all_messages = messages1 + messages2
762
769
for i , message in enumerate (consumer ):
763
770
self .assertEquals (all_messages [i ], message .message )
764
771
self .assertEquals (i , 19 )
765
772
773
+
766
774
def random_string (l ):
767
775
s = "" .join (random .choice (string .letters ) for i in xrange (l ))
768
776
return s
0 commit comments