Skip to content

Commit ebfeab8

Browse files
Merge pull request #12910 from rabbitmq/rabbitmq-server-12907-backport-v4.0.x
Backport #12907 to v4.0.x
2 parents 8001363 + 57e635c commit ebfeab8

File tree

9 files changed

+182
-5
lines changed

9 files changed

+182
-5
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ _APP_ENV = """[
8585
{exit_on_close, false}
8686
]},
8787
{ssl_apps, [asn1, crypto, public_key, ssl]},
88+
%% see rabbitmq-server#114
89+
{classic_queue_flow_control, true},
8890
%% see rabbitmq-server#227 and related tickets.
8991
%% msg_store_credit_disc_bound only takes effect when
9092
%% messages are persisted to the message store. If messages

deps/rabbit/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ define PROJECT_ENV
6868
{exit_on_close, false}
6969
]},
7070
{ssl_apps, [asn1, crypto, public_key, ssl]},
71+
%% see rabbitmq-server#114
72+
{classic_queue_flow_control, true},
7173
%% see rabbitmq-server#227 and related tickets.
7274
%% msg_store_credit_disc_bound only takes effect when
7375
%% messages are persisted to the message store. If messages

deps/rabbit/src/rabbit_channel.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@
159159
rejected,
160160
%% used by "one shot RPC" (amq.
161161
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
162-
delivery_flow, %% Deprecated since removal of CMQ in 4.0
162+
%% see rabbitmq-server#114
163+
delivery_flow :: flow | noflow,
163164
interceptor_state,
164165
queue_states,
165166
tick_timer,
@@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
489490
?LG_PROCESS_TYPE(channel),
490491
?store_proc_name({ConnName, Channel}),
491492
ok = pg_local:join(rabbit_channels, self()),
493+
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
494+
true -> flow;
495+
false -> noflow
496+
end,
492497
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
493498
Limiter0 = rabbit_limiter:new(LimiterPid),
494499
Global = Global0 andalso is_global_qos_permitted(),
@@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
537542
rejected = [],
538543
confirmed = [],
539544
reply_consumer = none,
545+
delivery_flow = Flow,
540546
interceptor_state = undefined,
541547
queue_states = rabbit_queue_type:init()
542548
},

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) ->
449449
Confirm = MsgSeqNo /= undefined,
450450

451451
{MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo),
452-
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo),
452+
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow),
453453

454454
case Flow of
455455
%% Here we are tracking messages sent by the rabbit_channel

deps/rabbit/test/classic_queue_SUITE.erl

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
-include_lib("eunit/include/eunit.hrl").
1010
-include_lib("amqp_client/include/amqp_client.hrl").
11+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1112

1213
-compile([nowarn_export_all, export_all]).
1314

@@ -18,11 +19,17 @@
1819

1920
all() ->
2021
[
22+
{group, cluster_size_1},
2123
{group, cluster_size_3}
2224
].
2325

2426
groups() ->
2527
[
28+
{cluster_size_1, [], [
29+
classic_queue_flow_control_enabled,
30+
classic_queue_flow_control_disabled
31+
]
32+
},
2633
{cluster_size_3, [], [
2734
leader_locator_client_local,
2835
leader_locator_balanced,
@@ -42,10 +49,14 @@ end_per_suite(Config) ->
4249
rabbit_ct_helpers:run_teardown_steps(Config).
4350

4451
init_per_group(Group, Config) ->
52+
Nodes = case Group of
53+
cluster_size_1 -> 1;
54+
cluster_size_3 -> 3
55+
end,
4556
Config1 = rabbit_ct_helpers:set_config(Config,
4657
[
4758
{rmq_nodename_suffix, Group},
48-
{rmq_nodes_count, 3},
59+
{rmq_nodes_count, Nodes},
4960
{rmq_nodes_clustered, true},
5061
{tcp_ports_base, {skip_n_nodes, 3}}
5162
]),
@@ -72,6 +83,67 @@ init_per_testcase(T, Config) ->
7283
%% Testcases.
7384
%% -------------------------------------------------------------------
7485

86+
classic_queue_flow_control_enabled(Config) ->
87+
FlowEnabled = true,
88+
VerifyFun =
89+
fun(QPid, ConnPid) ->
90+
%% Only 2+2 messages reach the message queue of the classic queue.
91+
%% (before the credits of the connection and channel processes run out)
92+
?awaitMatch(4, proc_info(QPid, message_queue_len), 1000),
93+
?assertMatch({0, _}, gen_server2_queue(QPid)),
94+
95+
%% The connection gets into flow state
96+
?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])),
97+
98+
Dict = proc_info(ConnPid, dictionary),
99+
?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)),
100+
ok
101+
end,
102+
flow_control(Config, FlowEnabled, VerifyFun).
103+
104+
classic_queue_flow_control_disabled(Config) ->
105+
FlowEnabled = false,
106+
VerifyFun =
107+
fun(QPid, ConnPid) ->
108+
%% All published messages will end up in the message
109+
%% queue of the suspended classic queue process
110+
?awaitMatch(100, proc_info(QPid, message_queue_len), 1000),
111+
?assertMatch({0, _}, gen_server2_queue(QPid)),
112+
113+
%% The connection dos not get into flow state
114+
?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])),
115+
116+
Dict = proc_info(ConnPid, dictionary),
117+
?assertMatch([], proplists:get_value(credit_blocked, Dict, []))
118+
end,
119+
flow_control(Config, FlowEnabled, VerifyFun).
120+
121+
flow_control(Config, FlowEnabled, VerifyFun) ->
122+
OrigCredit = set_default_credit(Config, {2, 1}),
123+
OrigFlow = set_flow_control(Config, FlowEnabled),
124+
125+
Ch = rabbit_ct_client_helpers:open_channel(Config),
126+
QueueName = atom_to_binary(?FUNCTION_NAME),
127+
declare(Ch, QueueName, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
128+
QPid = get_queue_pid(Config, QueueName),
129+
try
130+
sys:suspend(QPid),
131+
132+
%% Publish 100 messages without publisher confirms
133+
publish_many(Ch, QueueName, 100),
134+
135+
[ConnPid] = rabbit_ct_broker_helpers:rpc(Config, rabbit_networking, local_connections, []),
136+
137+
VerifyFun(QPid, ConnPid),
138+
ok
139+
after
140+
sys:resume(QPid),
141+
delete_queues(Ch, [QueueName]),
142+
set_default_credit(Config, OrigCredit),
143+
set_flow_control(Config, OrigFlow),
144+
rabbit_ct_client_helpers:close_channel(Ch)
145+
end.
146+
75147
leader_locator_client_local(Config) ->
76148
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
77149
Q = <<"q1">>,
@@ -129,7 +201,55 @@ declare(Ch, Q, Args) ->
129201
auto_delete = false,
130202
arguments = Args}).
131203

204+
delete_queues(Ch, Qs) ->
205+
[?assertMatch(#'queue.delete_ok'{},
206+
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
207+
|| Q <- Qs].
208+
132209
delete_queues() ->
133210
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
134211
|| Q <- rabbit_amqqueue:list()].
135212

213+
214+
publish(Ch, QName, Payload) ->
215+
amqp_channel:cast(Ch,
216+
#'basic.publish'{exchange = <<>>,
217+
routing_key = QName},
218+
#amqp_msg{payload = Payload}).
219+
220+
publish_many(Ch, QName, Count) ->
221+
[publish(Ch, QName, integer_to_binary(I))
222+
|| I <- lists:seq(1, Count)].
223+
224+
proc_info(Pid, Info) ->
225+
case rabbit_misc:process_info(Pid, Info) of
226+
{Info, Value} ->
227+
Value;
228+
Error ->
229+
{error, Error}
230+
end.
231+
232+
gen_server2_queue(Pid) ->
233+
Status = sys:get_status(Pid),
234+
{status, Pid,_Mod,
235+
[_Dict, _SysStatus, _Parent, _Dbg,
236+
[{header, _},
237+
{data, Data}|_]]} = Status,
238+
proplists:get_value("Queued messages", Data).
239+
240+
set_default_credit(Config, Value) ->
241+
Key = credit_flow_default_credit,
242+
OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
243+
ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]),
244+
OrigValue.
245+
246+
set_flow_control(Config, Value) when is_boolean(Value) ->
247+
Key = classic_queue_flow_control,
248+
{ok, OrigValue} = rabbit_ct_broker_helpers:rpc(Config, application, get_env, [rabbit, Key]),
249+
rabbit_ct_broker_helpers:rpc(Config, application, set_env, [rabbit, Key, Value]),
250+
OrigValue.
251+
252+
get_queue_pid(Config, QueueName) ->
253+
{ok, QRec} = rabbit_ct_broker_helpers:rpc(
254+
Config, 0, rabbit_amqqueue, lookup, [QueueName, <<"/">>]),
255+
amqqueue:get_pid(QRec).

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
build_acyclic_graph/3]).
5151
-export([const/1]).
5252
-export([ntoa/1, ntoab/1]).
53-
-export([is_process_alive/1]).
53+
-export([is_process_alive/1,
54+
process_info/2]).
5455
-export([pget/2, pget/3, pupdate/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]).
5556
-export([deep_pget/2, deep_pget/3]).
5657
-export([format_message_queue/2]).
@@ -812,6 +813,23 @@ is_process_alive(Pid) ->
812813
lists:member(Node, [node() | nodes(connected)]) andalso
813814
rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
814815

816+
%% Get process info of a prossibly remote process.
817+
%% We try to avoid reconnecting to down nodes.
818+
-spec process_info(pid(), ItemSpec) -> Result| undefined | {badrpc, term()}
819+
when
820+
ItemSpec :: atom() | list() | tuple(),
821+
Result :: {atom() | tuple(), term()} | [{atom() | tuple(), term()}].
822+
process_info(Pid, Items) when node(Pid) =:= node() ->
823+
erlang:process_info(Pid, Items);
824+
process_info(Pid, Items) ->
825+
Node = node(Pid),
826+
case lists:member(Node, [node() | nodes(connected)]) of
827+
true ->
828+
rpc:call(Node, erlang, process_info, [Pid, Items]);
829+
_ ->
830+
{badrpc, nodedown}
831+
end.
832+
815833
-spec pget(term(), list() | map()) -> term().
816834
pget(K, M) when is_map(M) ->
817835
maps:get(K, M, undefined);

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
published = false :: boolean(),
7373
ssl_login_name :: none | binary(),
7474
retainer_pid :: pid(),
75+
delivery_flow :: flow | noflow,
7576
trace_state :: rabbit_trace:state(),
7677
prefetch :: non_neg_integer(),
7778
vhost :: rabbit_types:vhost(),
@@ -148,6 +149,10 @@ process_connect(
148149
"protocol version: ~p, keepalive: ~p, property names: ~p",
149150
[ClientId0, Username0, CleanStart, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]),
150151
SslLoginName = ssl_login_name(Socket),
152+
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
153+
true -> flow;
154+
false -> noflow
155+
end,
151156
MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE),
152157
TopicAliasMax = persistent_term:get(?PERSISTENT_TERM_TOPIC_ALIAS_MAXIMUM),
153158
TopicAliasMaxOutbound = min(maps:get('Topic-Alias-Maximum', ConnectProps, 0), TopicAliasMax),
@@ -208,6 +213,7 @@ process_connect(
208213
clean_start = CleanStart,
209214
session_expiry_interval_secs = SessionExpiry,
210215
ssl_login_name = SslLoginName,
216+
delivery_flow = Flow,
211217
trace_state = TraceState,
212218
prefetch = prefetch(ConnectProps),
213219
conn_name = ConnName,
@@ -1551,6 +1557,7 @@ publish_to_queues(
15511557
#mqtt_msg{topic = Topic,
15521558
packet_id = PacketId} = MqttMsg,
15531559
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
1560+
delivery_flow = Flow,
15541561
conn_name = ConnName,
15551562
trace_state = TraceState},
15561563
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
@@ -1563,7 +1570,7 @@ publish_to_queues(
15631570
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
15641571
QNames = drop_local(QNames0, State),
15651572
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),
1566-
Opts = maps_put_truthy(correlation, PacketId, #{}),
1573+
Opts = maps_put_truthy(flow, Flow, maps_put_truthy(correlation, PacketId, #{})),
15671574
deliver_to_queues(Msg, Opts, QNames, State);
15681575
{error, not_found} ->
15691576
?LOG_ERROR("~s not found", [rabbit_misc:rs(ExchangeName)]),
@@ -2477,6 +2484,7 @@ format_status(
24772484
published = Published,
24782485
ssl_login_name = SSLLoginName,
24792486
retainer_pid = RetainerPid,
2487+
delivery_flow = DeliveryFlow,
24802488
trace_state = TraceState,
24812489
prefetch = Prefetch,
24822490
client_id = ClientID,
@@ -2498,6 +2506,7 @@ format_status(
24982506
ssl_login_name => SSLLoginName,
24992507
retainer_pid => RetainerPid,
25002508

2509+
delivery_flow => DeliveryFlow,
25012510
trace_state => TraceState,
25022511
prefetch => Prefetch,
25032512
client_id => ClientID,

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ cluster_size_3_tests() ->
130130
pubsub,
131131
queue_down_qos1,
132132
consuming_classic_queue_down,
133+
flow_classic_queue,
133134
flow_quorum_queue,
134135
flow_stream,
135136
rabbit_mqtt_qos0_queue,
@@ -485,6 +486,24 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) ->
485486
?awaitMatch([],
486487
all_connection_pids(Config), 10_000, 1000).
487488

489+
%% This test case does not require multiple nodes
490+
%% but it is grouped together with flow test cases for other queue types
491+
%% (and historically used to use a mirrored classic queue on multiple nodes)
492+
flow_classic_queue(Config) ->
493+
%% New nodes lookup via persistent_term:get/1 (since 4.0.0)
494+
%% Old nodes lookup via application:get_env/2. (that is taken care of by flow/3)
495+
%% Therefore, we set both persistent_term and application.
496+
Key = credit_flow_default_credit,
497+
Val = {2, 1},
498+
DefaultVal = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
499+
Result = rpc_all(Config, persistent_term, put, [Key, Val]),
500+
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
501+
502+
flow(Config, {rabbit, Key, Val}, <<"classic">>),
503+
504+
?assertEqual(Result, rpc_all(Config, persistent_term, put, [Key, DefaultVal])),
505+
ok.
506+
488507
flow_quorum_queue(Config) ->
489508
flow(Config, {rabbit, quorum_commands_soft_limit, 1}, <<"quorum">>).
490509

deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ bind_exchange_to_exchange_single_message(Config) -> mqtt_shared_SUITE:?FUNCTION_
8787
pubsub(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
8888
queue_down_qos1(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
8989
consuming_classic_queue_down(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
90+
flow_classic_queue(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
9091
flow_quorum_queue(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
9192
flow_stream(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
9293
rabbit_mqtt_qos0_queue(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).

0 commit comments

Comments
 (0)