Skip to content

Commit 343c577

Browse files
committed
Add mixed version tests
1 parent fab74cb commit 343c577

File tree

3 files changed

+120
-75
lines changed

3 files changed

+120
-75
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,11 @@ apply(Meta, #credit{credit = LinkCreditRcv, delivery_count = DeliveryCountRcv,
298298
#?MODULE{consumers = Cons1 = #{ConsumerId := Con2}} = State2,
299299
#consumer{credit = PostCred,
300300
delivery_count = PostDeliveryCount} = Con2,
301-
DrainedInsufficientMsgs = Drain andalso PostCred > 0,
302301
Available = messages_ready(State2),
303302
case credit_api_v2(Cfg) of
304303
true ->
305304
{Credit, DeliveryCount, State} =
306-
case DrainedInsufficientMsgs of
305+
case Drain andalso PostCred > 0 of
307306
true ->
308307
AdvancedDeliveryCount = add(PostDeliveryCount, PostCred),
309308
ZeroCredit = 0,
@@ -324,11 +323,11 @@ apply(Meta, #credit{credit = LinkCreditRcv, delivery_count = DeliveryCountRcv,
324323
%% We must always send a send_credit_reply because basic.credit is synchronous.
325324
%% Additionally, we keep the bug of credit API v1 that we send to queue client the
326325
%% send_drained reply before the delivery effects (resulting in the wrong behaviour
327-
%% that the seesion process sends to AMQP 1.0 client the FLOW before the TRANSFERs).
326+
%% that the session process sends to AMQP 1.0 client the FLOW before the TRANSFERs).
328327
%% We have to keep this bug because old rabbit_fifo_client implementations expect
329328
%% a send_drained Ra reply (they can't handle such a Ra effect).
330329
CreditReply = {send_credit_reply, Available},
331-
case DrainedInsufficientMsgs of
330+
case Drain of
332331
true ->
333332
AdvancedDeliveryCount = PostDeliveryCount + PostCred,
334333
Con = Con2#consumer{delivery_count = AdvancedDeliveryCount,
@@ -2115,10 +2114,9 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
21152114
%% recurse without consumer on queue
21162115
checkout_one(Meta, ExpiredMsg,
21172116
InitState#?MODULE{service_queue = SQ1}, Effects1);
2118-
#consumer{status = cancelled} ->
2119-
checkout_one(Meta, ExpiredMsg,
2120-
InitState#?MODULE{service_queue = SQ1}, Effects1);
2121-
#consumer{status = suspected_down} ->
2117+
#consumer{status = S}
2118+
when S =:= cancelled orelse
2119+
S =:= suspected_down ->
21222120
checkout_one(Meta, ExpiredMsg,
21232121
InitState#?MODULE{service_queue = SQ1}, Effects1);
21242122
#consumer{checked_out = Checked0,

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
username => binary(),
8181
prefetch => non_neg_integer(),
8282
args => list(),
83-
%% machine version 4
8483
%% set if and only if credit API v2 is in use
8584
initial_delivery_count => rabbit_queue_type:delivery_count()
8685
}.

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 114 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,17 @@ groups() ->
8888
[
8989
last_queue_confirms,
9090
target_queue_deleted,
91-
credit_reply_quorum_queue,
9291
async_notify_settled_classic_queue,
9392
async_notify_settled_quorum_queue,
9493
async_notify_settled_stream,
9594
async_notify_unsettled_classic_queue,
9695
async_notify_unsettled_quorum_queue,
9796
async_notify_unsettled_stream,
98-
link_flow_control
97+
link_flow_control,
98+
classic_queue_on_old_node,
99+
classic_queue_on_new_node,
100+
quorum_queue_on_old_node,
101+
quorum_queue_on_new_node
99102
]},
100103

101104
{metrics, [shuffle],
@@ -135,15 +138,17 @@ end_per_group(_, Config) ->
135138
rabbit_ct_client_helpers:teardown_steps() ++
136139
rabbit_ct_broker_helpers:teardown_steps()).
137140

138-
init_per_testcase(T = message_headers_conversion, Config) ->
141+
init_per_testcase(T, Config)
142+
when T =:= message_headers_conversion orelse
143+
T =:= roundtrip_with_drain_quorum_queue orelse
144+
T =:= timed_get_quorum_queue ->
139145
case rabbit_ct_broker_helpers:rpc(
140146
Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of
141147
true ->
142148
rabbit_ct_helpers:testcase_started(Config, T);
143149
false ->
144-
{skip, "Quorum queues are known to behave incorrectly with feature flag "
145-
"credit_api_v2 disabled because they send a send_drained queue event "
146-
"before sending all available messages."}
150+
{skip, "Receiving with drain from quorum queues in credit API v1 have a known "
151+
"bug that they reply with send_drained before delivering the message."}
147152
end;
148153
init_per_testcase(Testcase, Config) ->
149154
rabbit_ct_helpers:testcase_started(Config, Testcase).
@@ -563,9 +568,6 @@ roundtrip_with_drain(Config, QueueType, QName)
563568
Session, <<"test-sender">>, Address),
564569
wait_for_credit(Sender),
565570

566-
% Create a new message using a delivery-tag, body and indicate
567-
% its settlement status (true meaning no disposition confirmation
568-
% will be sent by the receiver).
569571
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, false),
570572
ok = amqp10_client:send_msg(Sender, OutMsg),
571573
ok = wait_for_accepts(1),
@@ -2151,61 +2153,9 @@ target_queue_deleted(Config) ->
21512153
amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})),
21522154
ok = rabbit_ct_client_helpers:close_channel(Ch).
21532155

2154-
%% This test is mostly interesting in mixed version mode with feature flag
2155-
%% consumer_tag_in_credit_reply disabled.
2156-
credit_reply_quorum_queue(Config) ->
2157-
%% Place quorum queue leader on the old version node.
2158-
OldVersionNode = 1,
2159-
Ch = rabbit_ct_client_helpers:open_channel(Config, OldVersionNode),
2160-
QName = atom_to_binary(?FUNCTION_NAME),
2161-
#'queue.declare_ok'{} = amqp_channel:call(
2162-
Ch, #'queue.declare'{
2163-
queue = QName,
2164-
durable = true,
2165-
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
2166-
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}]}),
2167-
%% Connect to the new node.
2168-
OpnConf = connection_config(Config),
2169-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
2170-
{ok, Session} = amqp10_client:begin_session_sync(Connection),
2171-
Address = <<"/amq/queue/", QName/binary>>,
2172-
{ok, Sender} = amqp10_client:attach_sender_link(
2173-
Session, <<"test-sender">>, Address),
2174-
ok = wait_for_credit(Sender),
2175-
{ok, Receiver} = amqp10_client:attach_receiver_link(
2176-
Session,
2177-
<<"test-receiver">>,
2178-
Address,
2179-
unsettled),
2180-
receive {amqp10_event, {link, Receiver, attached}} -> ok
2181-
after 5000 -> ct:fail("missing attached")
2182-
end,
2183-
flush(receiver_attached),
2184-
2185-
NumMsgs = 10,
2186-
[begin
2187-
Bin = integer_to_binary(N),
2188-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin, true))
2189-
end || N <- lists:seq(1, NumMsgs)],
2190-
2191-
%% Grant credits to the sending queue.
2192-
ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never),
2193-
2194-
%% We should receive all messages.
2195-
Msgs = receive_messages(Receiver, NumMsgs),
2196-
FirstMsg = hd(Msgs),
2197-
LastMsg = lists:last(Msgs),
2198-
?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)),
2199-
?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)),
2200-
2201-
ExpectedReadyMsgs = 0,
2202-
?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs},
2203-
amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
2204-
ok = rabbit_ct_client_helpers:close_channel(Ch),
2205-
ok = amqp10_client:close_connection(Connection).
2206-
22072156
async_notify_settled_classic_queue(Config) ->
2208-
%% TODO require ff message_containers
2157+
%% TODO Bump old version in mixed version tests to 3.13.x,
2158+
%% require ff message_containers and always run this test case.
22092159
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of
22102160
ok -> async_notify(settled, <<"classic">>, Config);
22112161
{skip, _} = Skip -> Skip
@@ -2218,7 +2168,8 @@ async_notify_settled_stream(Config) ->
22182168
async_notify(settled, <<"stream">>, Config).
22192169

22202170
async_notify_unsettled_classic_queue(Config) ->
2221-
%% TODO require ff message_containers
2171+
%% TODO Bump old version in mixed version tests to 3.13.x,
2172+
%% require ff message_containers and always run this test case.
22222173
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of
22232174
ok -> async_notify(unsettled, <<"classic">>, Config);
22242175
{skip, _} = Skip -> Skip
@@ -2232,14 +2183,15 @@ async_notify_unsettled_stream(Config) ->
22322183

22332184
%% Test asynchronous notification, figure 2.45.
22342185
async_notify(SenderSettleMode, QType, Config) ->
2186+
%% Place queue leader on the old node.
22352187
Ch = rabbit_ct_client_helpers:open_channel(Config, 1),
22362188
QName = atom_to_binary(?FUNCTION_NAME),
22372189
#'queue.declare_ok'{} = amqp_channel:call(
22382190
Ch, #'queue.declare'{
22392191
queue = QName,
22402192
durable = true,
22412193
arguments = [{<<"x-queue-type">>, longstr, QType}]}),
2242-
2194+
%% Connect AMQP client to the new node causing queue client to run the new code.
22432195
OpnConf = connection_config(Config),
22442196
{ok, Connection} = amqp10_client:open_connection(OpnConf),
22452197
{ok, Session} = amqp10_client:begin_session_sync(Connection),
@@ -2385,12 +2337,108 @@ link_flow_control(Config) ->
23852337
delete_queue(Config, QQ),
23862338
delete_queue(Config, CQ).
23872339

2340+
classic_queue_on_old_node(Config) ->
2341+
%% TODO Bump old version in mixed version tests to 3.13.x,
2342+
%% require ff message_containers and always run this test case.
2343+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of
2344+
ok -> queue_and_client_different_nodes(1, 0, <<"classic">>, Config);
2345+
{skip, _} = Skip -> Skip
2346+
end.
2347+
2348+
classic_queue_on_new_node(Config) ->
2349+
queue_and_client_different_nodes(0, 1, <<"classic">>, Config).
2350+
2351+
quorum_queue_on_old_node(Config) ->
2352+
queue_and_client_different_nodes(1, 0, <<"quorum">>, Config).
2353+
2354+
quorum_queue_on_new_node(Config) ->
2355+
queue_and_client_different_nodes(0, 1, <<"quorum">>, Config).
2356+
2357+
%% In mixed version tests, run the queue leader with old code
2358+
%% and queue client with new code, or vice versa.
2359+
queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ->
2360+
Ch = rabbit_ct_client_helpers:open_channel(Config, QueueLeaderNode),
2361+
QName = atom_to_binary(?FUNCTION_NAME),
2362+
#'queue.declare_ok'{} = amqp_channel:call(
2363+
Ch, #'queue.declare'{queue = QName,
2364+
durable = true,
2365+
arguments = [{<<"x-queue-type">>, longstr, QueueType}]}),
2366+
%% Connect AMQP client to the new node causing queue client to run the new code.
2367+
OpnConf = connection_config(ClientNode, Config),
2368+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
2369+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
2370+
Address = <<"/amq/queue/", QName/binary>>,
2371+
{ok, Sender} = amqp10_client:attach_sender_link(
2372+
Session, <<"test-sender">>, Address),
2373+
ok = wait_for_credit(Sender),
2374+
{ok, Receiver} = amqp10_client:attach_receiver_link(
2375+
Session,
2376+
<<"test-receiver">>,
2377+
Address,
2378+
unsettled),
2379+
receive {amqp10_event, {link, Receiver, attached}} -> ok
2380+
after 5000 -> ct:fail("missing attached")
2381+
end,
2382+
flush(receiver_attached),
2383+
2384+
NumMsgs = 10,
2385+
[begin
2386+
Bin = integer_to_binary(N),
2387+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin, true))
2388+
end || N <- lists:seq(1, NumMsgs)],
2389+
2390+
%% Grant credits to the sending queue.
2391+
ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never),
2392+
2393+
%% We should receive all messages.
2394+
Msgs = receive_messages(Receiver, NumMsgs),
2395+
FirstMsg = hd(Msgs),
2396+
LastMsg = lists:last(Msgs),
2397+
?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)),
2398+
?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)),
2399+
ok = amqp10_client_session:disposition(
2400+
Session,
2401+
receiver,
2402+
amqp10_msg:delivery_id(FirstMsg),
2403+
amqp10_msg:delivery_id(LastMsg),
2404+
true,
2405+
accepted),
2406+
2407+
CreditApiV2 = rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]),
2408+
case QueueType =:= <<"quorum">> andalso not CreditApiV2 of
2409+
true ->
2410+
ct:pal("Quorum queues in credit API v1 have a known bug that they "
2411+
"reply with send_drained before delivering the message.");
2412+
false ->
2413+
%% Send another message and drain.
2414+
Tag = <<"tag">>,
2415+
Body = <<"body">>,
2416+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Tag, Body, false)),
2417+
ok = wait_for_settlement(Tag),
2418+
ok = amqp10_client:flow_link_credit(Receiver, 999, never, true),
2419+
[Msg] = receive_messages(Receiver, 1),
2420+
?assertEqual([Body], amqp10_msg:body(Msg)),
2421+
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
2422+
after 5000 -> ct:fail("expected credit_exhausted")
2423+
end,
2424+
ok = amqp10_client:accept_msg(Receiver, Msg)
2425+
end,
2426+
2427+
ExpectedReadyMsgs = 0,
2428+
?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs},
2429+
amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
2430+
ok = rabbit_ct_client_helpers:close_channel(Ch),
2431+
ok = amqp10_client:close_connection(Connection).
2432+
23882433
%% internal
23892434
%%
23902435

23912436
connection_config(Config) ->
2437+
connection_config(0, Config).
2438+
2439+
connection_config(Node, Config) ->
23922440
Host = ?config(rmq_hostname, Config),
2393-
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
2441+
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
23942442
#{address => Host,
23952443
port => Port,
23962444
container_id => <<"my container">>,

0 commit comments

Comments
 (0)