Skip to content

Commit 3013a58

Browse files
committed
Add credit_api_v2 feature flag test
1 parent 343c577 commit 3013a58

File tree

7 files changed

+275
-17
lines changed

7 files changed

+275
-17
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@
8888
rcv_settle_mode => rcv_settle_mode(),
8989
filter => filter(),
9090
properties => properties(),
91-
max_message_size => max_message_size()
91+
max_message_size => max_message_size(),
92+
handle => link_handle()
9293
}.
9394

9495
-type transfer_error() :: {error,
@@ -751,7 +752,7 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
751752
Link#link{state = detach_sent}.
752753

753754
send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
754-
#state{next_link_handle = OutHandle, links = Links,
755+
#state{next_link_handle = OutHandle0, links = Links,
755756
link_index = LinkIndex} = State) ->
756757

757758
Source = make_source(Args),
@@ -766,6 +767,16 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
766767
{TargetAddr, false, uint(?INITIAL_DELIVERY_COUNT), undefined}
767768
end,
768769

770+
{OutHandle, NextLinkHandle} =
771+
case Args of
772+
#{handle := Handle} ->
773+
%% Client app provided link handle.
774+
%% Really only meant for integration tests.
775+
{Handle, OutHandle0};
776+
_ ->
777+
{OutHandle0, OutHandle0 + 1}
778+
end,
779+
769780
% create attach performative
770781
Attach = #'v1_0.attach'{name = {utf8, Name},
771782
role = RoleAsBool,
@@ -791,7 +802,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
791802
max_message_size = unpack(MaxMessageSize)},
792803

793804
{State#state{links = Links#{OutHandle => Link},
794-
next_link_handle = OutHandle + 1,
805+
next_link_handle = NextLinkHandle,
795806
link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}.
796807

797808
-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,12 +1276,12 @@ query_consumers(#?MODULE{consumers = Consumers,
12761276
FromConsumers =
12771277
maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
12781278
Acc;
1279-
({Tag, Pid},
1279+
(Key = {Tag, Pid},
12801280
#consumer{cfg = #consumer_cfg{meta = Meta}} = Consumer,
12811281
Acc) ->
12821282
{Active, ActivityStatus} =
1283-
ActiveActivityStatusFun({Tag, Pid}, Consumer),
1284-
maps:put({Tag, Pid},
1283+
ActiveActivityStatusFun(Key, Consumer),
1284+
maps:put(Key,
12851285
{Pid, Tag,
12861286
maps:get(ack, Meta, undefined),
12871287
maps:get(prefetch, Meta, undefined),
@@ -1294,12 +1294,12 @@ query_consumers(#?MODULE{consumers = Consumers,
12941294
FromWaitingConsumers =
12951295
lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
12961296
Acc;
1297-
({{Tag, Pid},
1297+
(Key = {{Tag, Pid},
12981298
#consumer{cfg = #consumer_cfg{meta = Meta}} = Consumer},
12991299
Acc) ->
13001300
{Active, ActivityStatus} =
1301-
ActiveActivityStatusFun({Tag, Pid}, Consumer),
1302-
maps:put({Tag, Pid},
1301+
ActiveActivityStatusFun(Key, Consumer),
1302+
maps:put(Key,
13031303
{Pid, Tag,
13041304
maps:get(ack, Meta, undefined),
13051305
maps:get(prefetch, Meta, undefined),
@@ -2305,8 +2305,6 @@ merge_consumer(Meta, #consumer{cfg = CCfg, checked_out = Checked} = Consumer,
23052305
NumChecked = map_size(Checked),
23062306
NewCredit = max(0, Credit - NumChecked),
23072307
Mode = credit_mode(Meta, Credit, Mode0),
2308-
%% TODO Forbid changing credit API version when detaching and attaching
2309-
%% with same link handle in the same AMQP 1.0 session.
23102308
Consumer#consumer{cfg = CCfg#consumer_cfg{priority = Priority,
23112309
meta = ConsumerMeta,
23122310
credit_mode = Mode,

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,6 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
366366
end,
367367
SDels = maps:update_with(
368368
ConsumerTag,
369-
%% TODO Forbid changing credit API version when detaching and attaching
370-
%% with same link handle in the same session.
371369
fun (C) -> C#consumer{ack = Ack} end,
372370
#consumer{last_msg_id = LastMsgId,
373371
ack = Ack,
@@ -827,10 +825,10 @@ update_consumer(Tag, LastId, DelCntIncr, Consumer, Consumers) ->
827825
credit_api_v2 -> credit_api_v2;
828826
{credit_api_v1, Count} -> {credit_api_v1, Count + DelCntIncr}
829827
end,
830-
maps:put(Tag,
831-
Consumer#consumer{last_msg_id = LastId,
832-
delivery_count = D},
833-
Consumers).
828+
maps:update(Tag,
829+
Consumer#consumer{last_msg_id = LastId,
830+
delivery_count = D},
831+
Consumers).
834832

835833
add_delivery_count(DelCntIncr, Tag, #state{consumer_deliveries = CDels0} = State) ->
836834
Con = #consumer{last_msg_id = LastMsgId} = maps:get(Tag, CDels0),

deps/rabbitmq_amqp1_0/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@ rabbitmq_integration_suite(
147147
],
148148
)
149149

150+
rabbitmq_integration_suite(
151+
name = "ff_SUITE",
152+
runtime_deps = [
153+
"//deps/amqp10_client:erlang_app",
154+
],
155+
)
156+
150157
assert_suites()
151158

152159
alias(

deps/rabbitmq_amqp1_0/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
162162
erlc_opts = "//:test_erlc_opts",
163163
deps = ["//deps/rabbit_common:erlang_app"],
164164
)
165+
erlang_bytecode(
166+
name = "ff_SUITE_beam_files",
167+
testonly = True,
168+
srcs = ["test/ff_SUITE.erl"],
169+
outs = ["test/ff_SUITE.beam"],
170+
app_name = "rabbitmq_amqp1_0",
171+
erlc_opts = "//:test_erlc_opts",
172+
deps = ["//deps/amqp_client:erlang_app"],
173+
)

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,19 @@ handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt),
918918
QName = rabbit_misc:r(Vhost, queue, QNameBin),
919919
case rabbit_amqqueue:lookup(QName) of
920920
{ok, Q} ->
921+
%%TODO Consider adding a new rabbit_queue_type:remove_consumer API that - from the point of view of
922+
%% the queue process - behaves as if our session process terminated: All messages checked out
923+
%% to this consumer should be re-queued automatically instead of us requeueing them here after cancelling
924+
%% consumption.
925+
%% For AMQP legacy (and STOMP / MQTT) consumer cancellation not requeueing messages is a good approach as
926+
%% clients may want to ack any in-flight messages.
927+
%% For AMQP however, the consuming client can stop cancellations via link-credit=0 and drain=true being
928+
%% sure that no messages are in flight before detaching the link. Hence, AMQP doesn't need the
929+
%% rabbit_queue_type:cancel API semantics.
930+
%% A rabbit_queue_type:remove_consumer API has also the advantage to simplify reasoning about clients
931+
%% first detaching and then re-attaching to the same session with the same link handle (the handle
932+
%% becomes available for re-use once a link is closed): This will result in the same consumer tag,
933+
%% and we ideally disallow "updating" an AMQP consumer.
921934
case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of
922935
{ok, QStates1} ->
923936
{Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0),
@@ -2001,6 +2014,7 @@ outcomes(_) ->
20012014

20022015
-spec handle_to_ctag(link_handle()) -> rabbit_types:ctag().
20032016
handle_to_ctag(Handle) ->
2017+
%%TODO Can we get rid of the ctag- prefix?
20042018
<<"ctag-", Handle:32/integer>>.
20052019

20062020
-spec ctag_to_handle(rabbit_types:ctag()) -> link_handle().
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2016-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
7+
-module(ff_SUITE).
8+
9+
-compile([export_all, nowarn_export_all]).
10+
11+
-include_lib("common_test/include/ct.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
-include_lib("amqp_client/include/amqp_client.hrl").
14+
15+
all() ->
16+
[
17+
{group, cluster_size_1}
18+
].
19+
20+
groups() ->
21+
[
22+
{cluster_size_1, [],
23+
[credit_api_v2]}
24+
].
25+
26+
suite() ->
27+
[
28+
{timetrap, {minutes, 10}}
29+
].
30+
31+
init_per_suite(Config) ->
32+
{ok, _} = application:ensure_all_started(amqp10_client),
33+
rabbit_ct_helpers:log_environment(),
34+
rabbit_ct_helpers:run_setup_steps(Config, []).
35+
36+
end_per_suite(Config) ->
37+
rabbit_ct_helpers:run_teardown_steps(Config).
38+
39+
init_per_group(_Group, Config0) ->
40+
Config = rabbit_ct_helpers:merge_app_env(
41+
Config0, {rabbit, [{forced_feature_flags_on_init, []}]}),
42+
rabbit_ct_helpers:run_steps(Config,
43+
rabbit_ct_broker_helpers:setup_steps() ++
44+
rabbit_ct_client_helpers:setup_steps()).
45+
46+
end_per_group(_Group, Config) ->
47+
rabbit_ct_helpers:run_steps(Config,
48+
rabbit_ct_client_helpers:teardown_steps() ++
49+
rabbit_ct_broker_helpers:teardown_steps()).
50+
51+
init_per_testcase(TestCase, Config) ->
52+
case rabbit_ct_broker_helpers:is_feature_flag_supported(Config, TestCase) of
53+
true ->
54+
?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, TestCase)),
55+
Config;
56+
false ->
57+
{skip, io_lib:format("feature flag ~s is unsupported", [TestCase])}
58+
end.
59+
60+
end_per_testcase(_TestCase, Config) ->
61+
Config.
62+
63+
credit_api_v2(Config) ->
64+
CQ = <<"classic queue">>,
65+
QQ = <<"quorum queue">>,
66+
CQAddr = <<"/amq/queue/", CQ/binary>>,
67+
QQAddr = <<"/amq/queue/", QQ/binary>>,
68+
69+
Ch = rabbit_ct_client_helpers:open_channel(Config),
70+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = CQ}),
71+
#'queue.declare_ok'{} = amqp_channel:call(
72+
Ch, #'queue.declare'{
73+
queue = QQ,
74+
durable = true,
75+
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
76+
ok = rabbit_ct_client_helpers:close_channel(Ch),
77+
78+
Host = ?config(rmq_hostname, Config),
79+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
80+
OpnConf = #{address => Host,
81+
port => Port,
82+
container_id => <<"my container">>,
83+
sasl => {plain, <<"guest">>, <<"guest">>}},
84+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
85+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
86+
87+
{ok, CQSender} = amqp10_client:attach_sender_link(Session, <<"cq sender">>, CQAddr),
88+
{ok, QQSender} = amqp10_client:attach_sender_link(Session, <<"qq sender">>, QQAddr),
89+
receive {amqp10_event, {link, CQSender, credited}} -> ok
90+
after 5000 -> ct:fail(credited_timeout)
91+
end,
92+
receive {amqp10_event, {link, QQSender, credited}} -> ok
93+
after 5000 -> ct:fail(credited_timeout)
94+
end,
95+
96+
%% Send 40 messages to each queue.
97+
NumMsgs = 40,
98+
[begin
99+
Bin = integer_to_binary(N),
100+
ok = amqp10_client:send_msg(CQSender, amqp10_msg:new(Bin, Bin, true)),
101+
ok = amqp10_client:send_msg(QQSender, amqp10_msg:new(Bin, Bin, true))
102+
end || N <- lists:seq(1, NumMsgs)],
103+
ok = amqp10_client:detach_link(CQSender),
104+
ok = amqp10_client:detach_link(QQSender),
105+
106+
%% Consume with credit API v1
107+
CQAttachArgs = #{handle => 300,
108+
name => <<"cq receiver 1">>,
109+
role => {receiver, #{address => CQAddr,
110+
durable => configuration}, self()},
111+
snd_settle_mode => unsettled,
112+
rcv_settle_mode => first,
113+
filter => #{}},
114+
{ok, CQReceiver1} = amqp10_client:attach_link(Session, CQAttachArgs),
115+
QQAttachArgs = #{handle => 400,
116+
name => <<"qq receiver 1">>,
117+
role => {receiver, #{address => QQAddr,
118+
durable => configuration}, self()},
119+
snd_settle_mode => unsettled,
120+
rcv_settle_mode => first,
121+
filter => #{}},
122+
{ok, QQReceiver1} = amqp10_client:attach_link(Session, QQAttachArgs),
123+
124+
ok = consume_and_accept(10, CQReceiver1, Session),
125+
ok = consume_and_accept(10, QQReceiver1, Session),
126+
127+
?assertEqual(ok,
128+
rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME)),
129+
flush(enabled_feature_flag),
130+
131+
%% Consume with credit API v2
132+
{ok, CQReceiver2} = amqp10_client:attach_receiver_link(
133+
Session, <<"cq receiver 2">>, CQAddr, unsettled),
134+
{ok, QQReceiver2} = amqp10_client:attach_receiver_link(
135+
Session, <<"qq receiver 2">>, QQAddr, unsettled),
136+
ok = consume_and_accept(10, CQReceiver2, Session),
137+
ok = consume_and_accept(10, QQReceiver2, Session),
138+
139+
%% Consume via with credit API v1
140+
ok = consume_and_accept(10, CQReceiver1, Session),
141+
ok = consume_and_accept(10, QQReceiver1, Session),
142+
143+
%% Detach the credit API v1 links and attach with the same output handle.
144+
ok = detach_sync(CQReceiver1),
145+
ok = detach_sync(QQReceiver1),
146+
{ok, CQReceiver3} = amqp10_client:attach_link(Session, CQAttachArgs),
147+
{ok, QQReceiver3} = amqp10_client:attach_link(Session, QQAttachArgs),
148+
149+
%% The new links should use credit API v2
150+
ok = consume_and_accept(10, CQReceiver3, Session),
151+
ok = consume_and_accept(10, QQReceiver3, Session),
152+
153+
flush(pre_drain),
154+
%% Draining should also work.
155+
ok = amqp10_client:flow_link_credit(CQReceiver3, 10, never, true),
156+
receive {amqp10_event, {link, CQReceiver3, credit_exhausted}} -> ok
157+
after 5000 -> ct:fail({missing_credit_exhausted, ?LINE})
158+
end,
159+
receive Unexpected1 -> ct:fail({unexpected, ?LINE, Unexpected1})
160+
after 20 -> ok
161+
end,
162+
163+
ok = amqp10_client:flow_link_credit(QQReceiver3, 10, never, true),
164+
receive {amqp10_event, {link, QQReceiver3, credit_exhausted}} -> ok
165+
after 5000 -> ct:fail({missing_credit_exhausted, ?LINE})
166+
end,
167+
receive Unexpected2 -> ct:fail({unexpected, ?LINE, Unexpected2})
168+
after 20 -> ok
169+
end,
170+
171+
ok = detach_sync(CQReceiver2),
172+
ok = detach_sync(QQReceiver2),
173+
ok = detach_sync(CQReceiver3),
174+
ok = detach_sync(QQReceiver3),
175+
ok = amqp10_client:end_session(Session),
176+
receive {amqp10_event, {session, Session, {ended, _}}} -> ok
177+
after 5000 -> ct:fail(missing_ended)
178+
end,
179+
ok = amqp10_client:close_connection(Connection),
180+
receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok
181+
after 5000 -> ct:fail(missing_closed)
182+
end.
183+
184+
consume_and_accept(NumMsgs, Receiver, Session) ->
185+
ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never),
186+
Msgs = receive_messages(Receiver, NumMsgs),
187+
ok = amqp10_client_session:disposition(
188+
Session,
189+
receiver,
190+
amqp10_msg:delivery_id(hd(Msgs)),
191+
amqp10_msg:delivery_id(lists:last(Msgs)),
192+
true,
193+
accepted).
194+
195+
receive_messages(Receiver, N) ->
196+
receive_messages0(Receiver, N, []).
197+
198+
receive_messages0(_Receiver, 0, Acc) ->
199+
lists:reverse(Acc);
200+
receive_messages0(Receiver, N, Acc) ->
201+
receive
202+
{amqp10_msg, Receiver, Msg} ->
203+
receive_messages0(Receiver, N - 1, [Msg | Acc])
204+
after 5000 ->
205+
exit({timeout, {num_received, length(Acc)}, {num_missing, N}})
206+
end.
207+
208+
detach_sync(Receiver) ->
209+
ok = amqp10_client:detach_link(Receiver),
210+
receive {amqp10_event, {link, Receiver, {detached, normal}}} -> ok
211+
after 5000 -> ct:fail({missing_detached, Receiver})
212+
end.
213+
214+
flush(Prefix) ->
215+
receive
216+
Msg ->
217+
ct:pal("~ts flushed: ~p~n", [Prefix, Msg]),
218+
flush(Prefix)
219+
after 1 ->
220+
ok
221+
end.

0 commit comments

Comments
 (0)