17
17
-compile ([export_all , nowarn_export_all ]).
18
18
19
19
suite () ->
20
- [{timetrap , {seconds , 120 }}].
20
+ [{timetrap , {minutes , 4 }}].
21
21
22
22
all () ->
23
23
[
@@ -64,7 +64,8 @@ shared() ->
64
64
split_transfer ,
65
65
transfer_unsettled ,
66
66
subscribe ,
67
- subscribe_with_auto_flow ,
67
+ subscribe_with_auto_flow_settled ,
68
+ subscribe_with_auto_flow_unsettled ,
68
69
outgoing_heartbeat ,
69
70
roundtrip_large_messages ,
70
71
transfer_id_vs_delivery_id
@@ -290,12 +291,15 @@ roundtrip_large_messages(Config) ->
290
291
Hostname = ? config (rmq_hostname , Config ),
291
292
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
292
293
OpenConf = #{address => Hostname , port => Port , sasl => anon },
293
- DataKb = crypto :strong_rand_bytes (1024 ),
294
- roundtrip (OpenConf , DataKb ),
295
- Data1Mb = binary :copy (DataKb , 1024 ),
296
- roundtrip (OpenConf , Data1Mb ),
297
- roundtrip (OpenConf , binary :copy (Data1Mb , 8 )),
298
- ok = roundtrip (OpenConf , binary :copy (Data1Mb , 64 )).
294
+
295
+ DataKb = rand :bytes (1024 ),
296
+ DataMb = rand :bytes (1024 * 1024 ),
297
+ Data8Mb = rand :bytes (8 * 1024 * 1024 ),
298
+ Data64Mb = rand :bytes (64 * 1024 * 1024 ),
299
+ ok = roundtrip (OpenConf , DataKb ),
300
+ ok = roundtrip (OpenConf , DataMb ),
301
+ ok = roundtrip (OpenConf , Data8Mb ),
302
+ ok = roundtrip (OpenConf , Data64Mb ).
299
303
300
304
roundtrip (OpenConf ) ->
301
305
roundtrip (OpenConf , <<" banana" >>).
@@ -321,9 +325,10 @@ roundtrip(OpenConf, Body) ->
321
325
{error , link_not_found } = amqp10_client :detach_link (Sender ),
322
326
{ok , Receiver } = amqp10_client :attach_receiver_link (
323
327
Session , <<" banana-receiver" >>, <<" test1" >>, settled , unsettled_state ),
324
- {ok , OutMsg } = amqp10_client :get_msg (Receiver , 60_000 * 4 ),
328
+ {ok , OutMsg } = amqp10_client :get_msg (Receiver , 4 * 60_000 ),
325
329
ok = amqp10_client :end_session (Session ),
326
330
ok = amqp10_client :close_connection (Connection ),
331
+
327
332
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
328
333
#{creation_time := Now } = amqp10_msg :properties (OutMsg ),
329
334
#{<<" a_key" >> := <<" a_value" >>} = amqp10_msg :application_properties (OutMsg ),
@@ -502,7 +507,7 @@ transfer_unsettled(Config) ->
502
507
subscribe (Config ) ->
503
508
Hostname = ? config (rmq_hostname , Config ),
504
509
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
505
- QueueName = << " test-sub " >> ,
510
+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
506
511
{ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
507
512
{ok , Session } = amqp10_client :begin_session (Connection ),
508
513
{ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
@@ -530,104 +535,121 @@ subscribe(Config) ->
530
535
ok = amqp10_client :end_session (Session ),
531
536
ok = amqp10_client :close_connection (Connection ).
532
537
533
- subscribe_with_auto_flow (Config ) ->
538
+ subscribe_with_auto_flow_settled (Config ) ->
539
+ SenderSettleMode = settled ,
534
540
Hostname = ? config (rmq_hostname , Config ),
535
541
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
536
- QueueName = << " test-sub " >> ,
542
+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
537
543
{ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
538
544
{ok , Session } = amqp10_client :begin_session (Connection ),
539
545
{ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
540
546
<<" sub-sender" >>,
541
547
QueueName ),
542
548
await_link (Sender , credited , link_credit_timeout ),
543
549
544
- _ = publish_messages (Sender , <<" banana" >>, 20 ),
545
- % % Use sender settle mode 'settled'.
546
- {ok , R1 } = amqp10_client :attach_receiver_link (
547
- Session , <<" sub-receiver-1" >>, QueueName , settled ),
548
- await_link (R1 , attached , attached_timeout ),
549
- ok = amqp10_client :flow_link_credit (R1 , 5 , 2 ),
550
- ? assertEqual (20 , count_received_messages (R1 )),
551
- ok = amqp10_client :detach_link (R1 ),
550
+ publish_messages (Sender , <<" banana" >>, 20 ),
551
+ {ok , Receiver } = amqp10_client :attach_receiver_link (
552
+ Session , <<" sub-receiver" >>, QueueName , SenderSettleMode ),
553
+ await_link (Receiver , attached , attached_timeout ),
554
+
555
+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
556
+ ? assertEqual (20 , count_received_messages (Receiver )),
557
+
558
+ ok = amqp10_client :detach_link (Receiver ),
559
+ ok = amqp10_client :detach_link (Sender ),
560
+ ok = amqp10_client :end_session (Session ),
561
+ ok = amqp10_client :close_connection (Connection ).
562
+
563
+ subscribe_with_auto_flow_unsettled (Config ) ->
564
+ SenderSettleMode = unsettled ,
565
+ Hostname = ? config (rmq_hostname , Config ),
566
+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
567
+ QueueName = atom_to_binary (? FUNCTION_NAME ),
568
+ {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
569
+ {ok , Session } = amqp10_client :begin_session (Connection ),
570
+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
571
+ <<" sub-sender" >>,
572
+ QueueName ),
573
+ await_link (Sender , credited , link_credit_timeout ),
552
574
553
- _ = publish_messages (Sender , <<" banana " >>, 30 ),
575
+ _ = publish_messages (Sender , <<" 1- " >>, 30 ),
554
576
% % Use sender settle mode 'unsettled'.
555
577
% % This should require us to manually settle message in order to receive more messages.
556
- {ok , R2 } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , unsettled ),
557
- await_link (R2 , attached , attached_timeout ),
558
- ok = amqp10_client :flow_link_credit (R2 , 5 , 2 ),
578
+ {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , SenderSettleMode ),
579
+ await_link (Receiver , attached , attached_timeout ),
580
+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
559
581
% % We should receive exactly 5 messages.
560
- [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (R2 , 5 ),
561
- ok = assert_no_message (R2 ),
582
+ [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (Receiver , 5 ),
583
+ ok = assert_no_message (Receiver ),
562
584
563
585
% % Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
564
586
% % Therefore, the client should not yet grant more credits to the sender.
565
587
ok = amqp10_client_session :disposition (
566
- R2 , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
567
- ok = assert_no_message (R2 ),
588
+ Receiver , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
589
+ ok = assert_no_message (Receiver ),
568
590
569
591
% % When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
570
592
% % the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
571
593
% % Therefore, the client should grant 5 credits to the sender.
572
594
% % Therefore, we should receive 5 more messages.
573
- ok = amqp10_client :accept_msg (R2 , M5 ),
574
- [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (R2 , 5 ),
575
- ok = assert_no_message (R2 ),
595
+ ok = amqp10_client :accept_msg (Receiver , M5 ),
596
+ [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (Receiver , 5 ),
597
+ ok = assert_no_message (Receiver ),
576
598
577
599
% % It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
578
600
% % Settling all in flight messages should cause us to receive exactly 5 more messages.
579
601
ok = amqp10_client_session :disposition (
580
- R2 , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
581
- [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (R2 , 5 ),
582
- ok = assert_no_message (R2 ),
602
+ Receiver , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
603
+ [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (Receiver , 5 ),
604
+ ok = assert_no_message (Receiver ),
583
605
584
606
% % Dynamically decrease link credit.
585
607
% % Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
586
- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
587
- [M16 , _M17 , M18 ] = receive_messages (R2 , 3 ),
588
- ok = assert_no_message (R2 ),
608
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
609
+ [M16 , _M17 , M18 ] = receive_messages (Receiver , 3 ),
610
+ ok = assert_no_message (Receiver ),
589
611
590
612
ok = amqp10_client_session :disposition (
591
- R2 , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
613
+ Receiver , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
592
614
% % However, the RenewWhenBelow=3 still refers to all unsettled messages.
593
615
% % Right now we have 3 messages (M16, M17, M18) unsettled.
594
- ok = assert_no_message (R2 ),
616
+ ok = assert_no_message (Receiver ),
595
617
596
618
% % Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
597
619
% % in 3 new messages to be received.
598
- ok = amqp10_client :accept_msg (R2 , M18 ),
599
- [_M19 , _M20 , _M21 ] = receive_messages (R2 , 3 ),
600
- ok = assert_no_message (R2 ),
620
+ ok = amqp10_client :accept_msg (Receiver , M18 ),
621
+ [_M19 , _M20 , _M21 ] = receive_messages (Receiver , 3 ),
622
+ ok = assert_no_message (Receiver ),
601
623
602
- ok = amqp10_client :flow_link_credit (R2 , 3 , never , true ),
603
- [_M22 , _M23 , M24 ] = receive_messages (R2 , 3 ),
604
- ok = assert_no_message (R2 ),
624
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , never , true ),
625
+ [_M22 , _M23 , M24 ] = receive_messages (Receiver , 3 ),
626
+ ok = assert_no_message (Receiver ),
605
627
606
628
% % Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
607
629
ok = amqp10_client_session :disposition (
608
- R2 , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
609
- ok = assert_no_message (R2 ),
630
+ Receiver , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
631
+ ok = assert_no_message (Receiver ),
610
632
611
- ok = amqp10_client :flow_link_credit (R2 , 2 , never , false ),
612
- [M25 , _M26 ] = receive_messages (R2 , 2 ),
613
- ok = assert_no_message (R2 ),
633
+ ok = amqp10_client :flow_link_credit (Receiver , 2 , never , false ),
634
+ [M25 , _M26 ] = receive_messages (Receiver , 2 ),
635
+ ok = assert_no_message (Receiver ),
614
636
615
- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
616
- [_M27 , _M28 , M29 ] = receive_messages (R2 , 3 ),
617
- ok = assert_no_message (R2 ),
637
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
638
+ [_M27 , _M28 , M29 ] = receive_messages (Receiver , 3 ),
639
+ ok = assert_no_message (Receiver ),
618
640
619
641
ok = amqp10_client_session :disposition (
620
- R2 , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
621
- [M30 ] = receive_messages (R2 , 1 ),
622
- ok = assert_no_message (R2 ),
623
- ok = amqp10_client :accept_msg (R2 , M30 ),
642
+ Receiver , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
643
+ [M30 ] = receive_messages (Receiver , 1 ),
644
+ ok = assert_no_message (Receiver ),
645
+ ok = amqp10_client :accept_msg (Receiver , M30 ),
624
646
% % The sender queue is empty now.
625
- ok = assert_no_message (R2 ),
647
+ ok = assert_no_message (Receiver ),
626
648
627
- ok = amqp10_client :flow_link_credit (R2 , 3 , 1 ),
628
- _ = publish_messages (Sender , <<" banana " >>, 1 ),
629
- [M31 ] = receive_messages (R2 , 1 ),
630
- ok = amqp10_client :accept_msg (R2 , M31 ),
649
+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 1 ),
650
+ _ = publish_messages (Sender , <<" 2- " >>, 1 ),
651
+ [M31 ] = receive_messages (Receiver , 1 ),
652
+ ok = amqp10_client :accept_msg (Receiver , M31 ),
631
653
632
654
% % Since function flow_link_credit/3 documents
633
655
% % "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
@@ -637,24 +659,25 @@ subscribe_with_auto_flow(Config) ->
637
659
% % remaining link credit (2) and unsettled messages (0) is 2.
638
660
% %
639
661
% % Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
640
- _ = publish_messages (Sender , <<" banana " >>, 5 ),
641
- [M32 , M33 ] = receive_messages (R2 , 2 ),
642
- ok = assert_no_message (R2 ),
662
+ _ = publish_messages (Sender , <<" 3- " >>, 5 ),
663
+ [M32 , M33 ] = receive_messages (Receiver , 2 ),
664
+ ok = assert_no_message (Receiver ),
643
665
644
666
% % When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
645
667
% % falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
646
- ok = amqp10_client :accept_msg (R2 , M32 ),
647
- ok = assert_no_message (R2 ),
648
- ok = amqp10_client :accept_msg (R2 , M33 ),
649
-
650
- [M35 , M36 , M37 ] = receive_messages (R2 , 3 ),
651
- ok = amqp10_client :accept_msg (R2 , M35 ),
652
- ok = amqp10_client :accept_msg (R2 , M36 ),
653
- ok = amqp10_client :accept_msg (R2 , M37 ),
668
+ ok = amqp10_client :accept_msg (Receiver , M32 ),
669
+ ok = assert_no_message (Receiver ),
670
+ ok = amqp10_client :accept_msg (Receiver , M33 ),
671
+
672
+ [M35 , M36 , M37 ] = receive_messages (Receiver , 3 ),
673
+ ok = amqp10_client :accept_msg (Receiver , M35 ),
674
+ ok = amqp10_client :accept_msg (Receiver , M36 ),
675
+ ok = amqp10_client :accept_msg (Receiver , M37 ),
654
676
% % The sender queue is empty now.
655
- ok = assert_no_message (R2 ),
677
+ ok = assert_no_message (Receiver ),
656
678
657
- ok = amqp10_client :detach_link (R2 ),
679
+ ok = amqp10_client :detach_link (Receiver ),
680
+ ok = amqp10_client :detach_link (Sender ),
658
681
ok = amqp10_client :end_session (Session ),
659
682
ok = amqp10_client :close_connection (Connection ).
660
683
@@ -817,18 +840,18 @@ await_link(Who, What, Err) ->
817
840
ok ;
818
841
{amqp10_event , {link , Who0 , {detached , Why }}}
819
842
when Who0 =:= Who ->
820
- exit (Why )
843
+ ct : fail (Why )
821
844
after 5000 ->
822
845
flush (),
823
- exit (Err )
846
+ ct : fail (Err )
824
847
end .
825
848
826
- publish_messages (Sender , Data , Num ) ->
849
+ publish_messages (Sender , BodyPrefix , Num ) ->
827
850
[begin
828
- Tag = integer_to_binary (T ),
829
- Msg = amqp10_msg :new (Tag , Data , false ),
830
- ok = amqp10_client :send_msg (Sender , Msg ),
831
- ok = await_disposition (Tag )
851
+ Tag = integer_to_binary (T ),
852
+ Msg = amqp10_msg :new (Tag , << BodyPrefix / binary , Tag / binary >> , false ),
853
+ ok = amqp10_client :send_msg (Sender , Msg ),
854
+ ok = await_disposition (Tag )
832
855
end || T <- lists :seq (1 , Num )].
833
856
834
857
await_disposition (DeliveryTag ) ->
@@ -847,7 +870,7 @@ count_received_messages0(Receiver, Count) ->
847
870
receive
848
871
{amqp10_msg , Receiver , _Msg } ->
849
872
count_received_messages0 (Receiver , Count + 1 )
850
- after 200 ->
873
+ after 500 ->
851
874
Count
852
875
end .
853
876
@@ -861,7 +884,15 @@ receive_messages0(Receiver, N, Acc) ->
861
884
{amqp10_msg , Receiver , Msg } ->
862
885
receive_messages0 (Receiver , N - 1 , [Msg | Acc ])
863
886
after 5000 ->
864
- ct :fail ({timeout , {num_received , length (Acc )}, {num_missing , N }})
887
+ LastReceivedMsg = case Acc of
888
+ [] -> none ;
889
+ [M | _ ] -> M
890
+ end ,
891
+ ct :fail ({timeout ,
892
+ {num_received , length (Acc )},
893
+ {num_missing , N },
894
+ {last_received_msg , LastReceivedMsg }
895
+ })
865
896
end .
866
897
867
898
assert_no_message (Receiver ) ->
0 commit comments