@@ -64,7 +64,8 @@ shared() ->
64
64
split_transfer ,
65
65
transfer_unsettled ,
66
66
subscribe ,
67
- subscribe_with_auto_flow ,
67
+ subscribe_with_auto_flow_settled ,
68
+ subscribe_with_auto_flow_unsettled ,
68
69
outgoing_heartbeat ,
69
70
roundtrip_large_messages ,
70
71
transfer_id_vs_delivery_id
@@ -502,7 +503,7 @@ transfer_unsettled(Config) ->
502
503
subscribe (Config ) ->
503
504
Hostname = ? config (rmq_hostname , Config ),
504
505
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
505
- QueueName = << " test-sub " >> ,
506
+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
506
507
{ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
507
508
{ok , Session } = amqp10_client :begin_session (Connection ),
508
509
{ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
@@ -530,104 +531,121 @@ subscribe(Config) ->
530
531
ok = amqp10_client :end_session (Session ),
531
532
ok = amqp10_client :close_connection (Connection ).
532
533
533
- subscribe_with_auto_flow (Config ) ->
534
+ subscribe_with_auto_flow_settled (Config ) ->
535
+ SenderSettleMode = settled ,
534
536
Hostname = ? config (rmq_hostname , Config ),
535
537
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
536
- QueueName = << " test-sub " >> ,
538
+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
537
539
{ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
538
540
{ok , Session } = amqp10_client :begin_session (Connection ),
539
541
{ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
540
542
<<" sub-sender" >>,
541
543
QueueName ),
542
544
await_link (Sender , credited , link_credit_timeout ),
543
545
544
- _ = publish_messages (Sender , <<" banana" >>, 20 ),
545
- % % Use sender settle mode 'settled'.
546
- {ok , R1 } = amqp10_client :attach_receiver_link (
547
- Session , <<" sub-receiver-1" >>, QueueName , settled ),
548
- await_link (R1 , attached , attached_timeout ),
549
- ok = amqp10_client :flow_link_credit (R1 , 5 , 2 ),
550
- ? assertEqual (20 , count_received_messages (R1 )),
551
- ok = amqp10_client :detach_link (R1 ),
546
+ publish_messages (Sender , <<" banana" >>, 20 ),
547
+ {ok , Receiver } = amqp10_client :attach_receiver_link (
548
+ Session , <<" sub-receiver" >>, QueueName , SenderSettleMode ),
549
+ await_link (Receiver , attached , attached_timeout ),
550
+
551
+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
552
+ ? assertEqual (20 , count_received_messages (Receiver )),
552
553
553
- _ = publish_messages (Sender , <<" banana" >>, 30 ),
554
+ ok = amqp10_client :detach_link (Receiver ),
555
+ ok = amqp10_client :detach_link (Sender ),
556
+ ok = amqp10_client :end_session (Session ),
557
+ ok = amqp10_client :close_connection (Connection ).
558
+
559
+ subscribe_with_auto_flow_unsettled (Config ) ->
560
+ SenderSettleMode = unsettled ,
561
+ Hostname = ? config (rmq_hostname , Config ),
562
+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
563
+ QueueName = atom_to_binary (? FUNCTION_NAME ),
564
+ {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
565
+ {ok , Session } = amqp10_client :begin_session (Connection ),
566
+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
567
+ <<" sub-sender" >>,
568
+ QueueName ),
569
+ await_link (Sender , credited , link_credit_timeout ),
570
+
571
+ _ = publish_messages (Sender , <<" 1-" >>, 30 ),
554
572
% % Use sender settle mode 'unsettled'.
555
573
% % This should require us to manually settle message in order to receive more messages.
556
- {ok , R2 } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , unsettled ),
557
- await_link (R2 , attached , attached_timeout ),
558
- ok = amqp10_client :flow_link_credit (R2 , 5 , 2 ),
574
+ {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , SenderSettleMode ),
575
+ await_link (Receiver , attached , attached_timeout ),
576
+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
559
577
% % We should receive exactly 5 messages.
560
- [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (R2 , 5 ),
561
- ok = assert_no_message (R2 ),
578
+ [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (Receiver , 5 ),
579
+ ok = assert_no_message (Receiver ),
562
580
563
581
% % Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
564
582
% % Therefore, the client should not yet grant more credits to the sender.
565
583
ok = amqp10_client_session :disposition (
566
- R2 , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
567
- ok = assert_no_message (R2 ),
584
+ Receiver , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
585
+ ok = assert_no_message (Receiver ),
568
586
569
587
% % When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
570
588
% % the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
571
589
% % Therefore, the client should grant 5 credits to the sender.
572
590
% % Therefore, we should receive 5 more messages.
573
- ok = amqp10_client :accept_msg (R2 , M5 ),
574
- [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (R2 , 5 ),
575
- ok = assert_no_message (R2 ),
591
+ ok = amqp10_client :accept_msg (Receiver , M5 ),
592
+ [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (Receiver , 5 ),
593
+ ok = assert_no_message (Receiver ),
576
594
577
595
% % It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
578
596
% % Settling all in flight messages should cause us to receive exactly 5 more messages.
579
597
ok = amqp10_client_session :disposition (
580
- R2 , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
581
- [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (R2 , 5 ),
582
- ok = assert_no_message (R2 ),
598
+ Receiver , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
599
+ [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (Receiver , 5 ),
600
+ ok = assert_no_message (Receiver ),
583
601
584
602
% % Dynamically decrease link credit.
585
603
% % Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
586
- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
587
- [M16 , _M17 , M18 ] = receive_messages (R2 , 3 ),
588
- ok = assert_no_message (R2 ),
604
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
605
+ [M16 , _M17 , M18 ] = receive_messages (Receiver , 3 ),
606
+ ok = assert_no_message (Receiver ),
589
607
590
608
ok = amqp10_client_session :disposition (
591
- R2 , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
609
+ Receiver , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
592
610
% % However, the RenewWhenBelow=3 still refers to all unsettled messages.
593
611
% % Right now we have 3 messages (M16, M17, M18) unsettled.
594
- ok = assert_no_message (R2 ),
612
+ ok = assert_no_message (Receiver ),
595
613
596
614
% % Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
597
615
% % in 3 new messages to be received.
598
- ok = amqp10_client :accept_msg (R2 , M18 ),
599
- [_M19 , _M20 , _M21 ] = receive_messages (R2 , 3 ),
600
- ok = assert_no_message (R2 ),
616
+ ok = amqp10_client :accept_msg (Receiver , M18 ),
617
+ [_M19 , _M20 , _M21 ] = receive_messages (Receiver , 3 ),
618
+ ok = assert_no_message (Receiver ),
601
619
602
- ok = amqp10_client :flow_link_credit (R2 , 3 , never , true ),
603
- [_M22 , _M23 , M24 ] = receive_messages (R2 , 3 ),
604
- ok = assert_no_message (R2 ),
620
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , never , true ),
621
+ [_M22 , _M23 , M24 ] = receive_messages (Receiver , 3 ),
622
+ ok = assert_no_message (Receiver ),
605
623
606
624
% % Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
607
625
ok = amqp10_client_session :disposition (
608
- R2 , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
609
- ok = assert_no_message (R2 ),
626
+ Receiver , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
627
+ ok = assert_no_message (Receiver ),
610
628
611
- ok = amqp10_client :flow_link_credit (R2 , 2 , never , false ),
612
- [M25 , _M26 ] = receive_messages (R2 , 2 ),
613
- ok = assert_no_message (R2 ),
629
+ ok = amqp10_client :flow_link_credit (Receiver , 2 , never , false ),
630
+ [M25 , _M26 ] = receive_messages (Receiver , 2 ),
631
+ ok = assert_no_message (Receiver ),
614
632
615
- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
616
- [_M27 , _M28 , M29 ] = receive_messages (R2 , 3 ),
617
- ok = assert_no_message (R2 ),
633
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
634
+ [_M27 , _M28 , M29 ] = receive_messages (Receiver , 3 ),
635
+ ok = assert_no_message (Receiver ),
618
636
619
637
ok = amqp10_client_session :disposition (
620
- R2 , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
621
- [M30 ] = receive_messages (R2 , 1 ),
622
- ok = assert_no_message (R2 ),
623
- ok = amqp10_client :accept_msg (R2 , M30 ),
638
+ Receiver , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
639
+ [M30 ] = receive_messages (Receiver , 1 ),
640
+ ok = assert_no_message (Receiver ),
641
+ ok = amqp10_client :accept_msg (Receiver , M30 ),
624
642
% % The sender queue is empty now.
625
- ok = assert_no_message (R2 ),
643
+ ok = assert_no_message (Receiver ),
626
644
627
- ok = amqp10_client :flow_link_credit (R2 , 3 , 1 ),
628
- _ = publish_messages (Sender , <<" banana " >>, 1 ),
629
- [M31 ] = receive_messages (R2 , 1 ),
630
- ok = amqp10_client :accept_msg (R2 , M31 ),
645
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 1 ),
646
+ _ = publish_messages (Sender , <<" 2- " >>, 1 ),
647
+ [M31 ] = receive_messages (Receiver , 1 ),
648
+ ok = amqp10_client :accept_msg (Receiver , M31 ),
631
649
632
650
% % Since function flow_link_credit/3 documents
633
651
% % "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
@@ -637,24 +655,25 @@ subscribe_with_auto_flow(Config) ->
637
655
% % remaining link credit (2) and unsettled messages (0) is 2.
638
656
% %
639
657
% % Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
640
- _ = publish_messages (Sender , <<" banana " >>, 5 ),
641
- [M32 , M33 ] = receive_messages (R2 , 2 ),
642
- ok = assert_no_message (R2 ),
658
+ _ = publish_messages (Sender , <<" 3- " >>, 5 ),
659
+ [M32 , M33 ] = receive_messages (Receiver , 2 ),
660
+ ok = assert_no_message (Receiver ),
643
661
644
662
% % When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
645
663
% % falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
646
- ok = amqp10_client :accept_msg (R2 , M32 ),
647
- ok = assert_no_message (R2 ),
648
- ok = amqp10_client :accept_msg (R2 , M33 ),
649
-
650
- [M35 , M36 , M37 ] = receive_messages (R2 , 3 ),
651
- ok = amqp10_client :accept_msg (R2 , M35 ),
652
- ok = amqp10_client :accept_msg (R2 , M36 ),
653
- ok = amqp10_client :accept_msg (R2 , M37 ),
664
+ ok = amqp10_client :accept_msg (Receiver , M32 ),
665
+ ok = assert_no_message (Receiver ),
666
+ ok = amqp10_client :accept_msg (Receiver , M33 ),
667
+
668
+ [M35 , M36 , M37 ] = receive_messages (Receiver , 3 ),
669
+ ok = amqp10_client :accept_msg (Receiver , M35 ),
670
+ ok = amqp10_client :accept_msg (Receiver , M36 ),
671
+ ok = amqp10_client :accept_msg (Receiver , M37 ),
654
672
% % The sender queue is empty now.
655
- ok = assert_no_message (R2 ),
673
+ ok = assert_no_message (Receiver ),
656
674
657
- ok = amqp10_client :detach_link (R2 ),
675
+ ok = amqp10_client :detach_link (Receiver ),
676
+ ok = amqp10_client :detach_link (Sender ),
658
677
ok = amqp10_client :end_session (Session ),
659
678
ok = amqp10_client :close_connection (Connection ).
660
679
@@ -817,18 +836,18 @@ await_link(Who, What, Err) ->
817
836
ok ;
818
837
{amqp10_event , {link , Who0 , {detached , Why }}}
819
838
when Who0 =:= Who ->
820
- exit (Why )
839
+ ct : fail (Why )
821
840
after 5000 ->
822
841
flush (),
823
- exit (Err )
842
+ ct : fail (Err )
824
843
end .
825
844
826
- publish_messages (Sender , Data , Num ) ->
845
+ publish_messages (Sender , BodyPrefix , Num ) ->
827
846
[begin
828
- Tag = integer_to_binary (T ),
829
- Msg = amqp10_msg :new (Tag , Data , false ),
830
- ok = amqp10_client :send_msg (Sender , Msg ),
831
- ok = await_disposition (Tag )
847
+ Tag = integer_to_binary (T ),
848
+ Msg = amqp10_msg :new (Tag , << BodyPrefix / binary , Tag / binary >> , false ),
849
+ ok = amqp10_client :send_msg (Sender , Msg ),
850
+ ok = await_disposition (Tag )
832
851
end || T <- lists :seq (1 , Num )].
833
852
834
853
await_disposition (DeliveryTag ) ->
@@ -847,7 +866,7 @@ count_received_messages0(Receiver, Count) ->
847
866
receive
848
867
{amqp10_msg , Receiver , _Msg } ->
849
868
count_received_messages0 (Receiver , Count + 1 )
850
- after 200 ->
869
+ after 500 ->
851
870
Count
852
871
end .
853
872
@@ -861,7 +880,15 @@ receive_messages0(Receiver, N, Acc) ->
861
880
{amqp10_msg , Receiver , Msg } ->
862
881
receive_messages0 (Receiver , N - 1 , [Msg | Acc ])
863
882
after 5000 ->
864
- ct :fail ({timeout , {num_received , length (Acc )}, {num_missing , N }})
883
+ LastReceivedMsg = case Acc of
884
+ [] -> none ;
885
+ [M | _ ] -> M
886
+ end ,
887
+ ct :fail ({timeout ,
888
+ {num_received , length (Acc )},
889
+ {num_missing , N },
890
+ {last_received_msg , LastReceivedMsg }
891
+ })
865
892
end .
866
893
867
894
assert_no_message (Receiver ) ->
0 commit comments