Skip to content

Commit d01d02c

Browse files
Merge pull request #4254 from rabbitmq/mqtt_quorum
Add quorum queues support for MQTT
2 parents a882000 + 0ae3f19 commit d01d02c

File tree

6 files changed

+135
-6
lines changed

6 files changed

+135
-6
lines changed

deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,9 @@ fun(Conf) ->
257257
LingerTimeout = cuttlefish:conf_get("mqtt.tcp_listen_options.linger.timeout", Conf, 0),
258258
{LingerOn, LingerTimeout}
259259
end}.
260+
261+
%% Durable queue type to be used for QoS 1 subscriptions
262+
%%
263+
264+
{mapping, "mqtt.durable_queue_type", "rabbitmq_mqtt.durable_queue_type",
265+
[{datatype, {enum, [classic, quorum]}}]}.

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
%% for testing purposes
1616
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
17-
add_client_id_to_adapter_info/2]).
17+
add_client_id_to_adapter_info/2, maybe_quorum/3]).
1818

1919
-include_lib("amqp_client/include/amqp_client.hrl").
2020
-include("rabbit_mqtt_frame.hrl").
@@ -399,7 +399,7 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
399399
message_id = MsgId,
400400
send_fun = SendFun,
401401
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
402-
amqp_channel:notify_received(DeliveryCtx),
402+
notify_received(DeliveryCtx),
403403
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
404404
{true, {?QOS_0, ?QOS_1}} ->
405405
amqp_channel:cast(
@@ -784,6 +784,22 @@ delivery_mode(?QOS_0) -> 1;
784784
delivery_mode(?QOS_1) -> 2;
785785
delivery_mode(?QOS_2) -> 2.
786786

787+
maybe_quorum(Qos1Args, CleanSession, Queue) ->
788+
case {rabbit_mqtt_util:env(durable_queue_type), CleanSession} of
789+
%% it is possible to Quorum queues only if Clean Session == False
790+
%% else always use Classic queues
791+
%% Clean Session == True sets auto-delete to True and quorum queues
792+
%% does not support auto-delete flag
793+
{quorum, false} -> lists:append(Qos1Args,
794+
[{<<"x-queue-type">>, longstr, <<"quorum">>}]);
795+
796+
{quorum, true} ->
797+
rabbit_log:debug("Can't use quorum queue for ~s. " ++
798+
"The clean session is true. Classic queue will be used", [Queue]),
799+
Qos1Args;
800+
_ -> Qos1Args
801+
end.
802+
787803
%% different qos subscriptions are received in different queues
788804
%% with appropriate durability and timeout arguments
789805
%% this will lead to duplicate messages for overlapping subscriptions
@@ -819,7 +835,7 @@ ensure_queue(Qos, #proc_state{ channels = {Channel, _},
819835
%%
820836
%% see rabbitmq/rabbitmq-mqtt#37
821837
auto_delete = CleanSess,
822-
arguments = Qos1Args },
838+
arguments = maybe_quorum(Qos1Args, CleanSess, QueueQ1)},
823839
#'basic.consume'{ queue = QueueQ1,
824840
no_ack = false }};
825841
{_, _, ?QOS_0} ->
@@ -1091,3 +1107,10 @@ additional_info(Key,
10911107
#proc_state{adapter_info =
10921108
#amqp_adapter_info{additional_info = AddInfo}}) ->
10931109
proplists:get_value(Key, AddInfo).
1110+
1111+
notify_received(undefined) ->
1112+
%% no notification for quorum queues and streams
1113+
ok;
1114+
notify_received(DeliveryCtx) ->
1115+
%% notification for flow control
1116+
amqp_channel:notify_received(DeliveryCtx).

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ handle_cast({close_connection, Reason},
123123
handle_cast(Msg, State) ->
124124
{stop, {mqtt_unexpected_cast, Msg}, State}.
125125

126+
handle_info({#'basic.deliver'{}, #amqp_msg{}} = Delivery,
127+
State) ->
128+
%% receiving a message from a quorum queue
129+
%% no delivery context
130+
handle_info(erlang:insert_element(3, Delivery, undefined), State);
126131
handle_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
127132
State = #state{ proc_state = ProcState }) ->
128133
callback_reply(State, rabbit_mqtt_processor:amqp_callback(Delivery,

deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
mqtt.listeners.tcp.default = 1883
1515
mqtt.tcp_listen_options.backlog = 128
1616
mqtt.tcp_listen_options.nodelay = true
17+
mqtt.durable_queue_type = classic
1718
mqtt.proxy_protocol = false",
1819
[{rabbit,[{tcp_listeners,[5672]}]},
1920
{rabbitmq_mqtt,
@@ -28,6 +29,7 @@
2829
{ssl_listeners,[]},
2930
{tcp_listeners,[1883]},
3031
{tcp_listen_options,[{backlog,128},{nodelay,true}]},
32+
{durable_queue_type,classic},
3133
{proxy_protocol,false}]}],
3234
[rabbitmq_mqtt]},
3335

deps/rabbitmq_mqtt/test/processor_SUITE.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ groups() ->
2424
interprets_colons_in_username_if_option_not_set,
2525
get_vhosts_from_global_runtime_parameter,
2626
get_vhost,
27-
add_client_id_to_adapter_info
27+
add_client_id_to_adapter_info,
28+
quorum_configuration
2829
]}
2930
].
3031

@@ -209,3 +210,22 @@ clear_vhost_global_parameters() ->
209210
ok = mnesia:delete(rabbit_runtime_parameters, mqtt_port_to_vhost_mapping, write)
210211
end,
211212
{atomic, ok} = mnesia:transaction(DeleteParameterFun).
213+
214+
quorum_configuration(_Config) ->
215+
MyArgs = [],
216+
%% default setting with CleanSession = true of false
217+
QMustBeClassic = rabbit_mqtt_processor:maybe_quorum(MyArgs, true, <<"">>),
218+
?assertEqual(QMustBeClassic, []),
219+
%% default setting with CleanSession = true of false
220+
QMustBeClassicEvenFalse = rabbit_mqtt_processor:maybe_quorum(MyArgs, false, <<"">>),
221+
?assertEqual(QMustBeClassicEvenFalse, []),
222+
application:set_env(rabbitmq_mqtt, durable_queue_type, quorum),
223+
%% quorum setting with CleanSession == false must me quorum
224+
QMustBeQuorum = rabbit_mqtt_processor:maybe_quorum(MyArgs, false, <<"">>),
225+
?assertEqual(QMustBeQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
226+
227+
%% quorum setting with CleanSession == true must me classic since
228+
%% quorum does not support auto-delete
229+
QEvenQuorumMustBeClassic = rabbit_mqtt_processor:maybe_quorum(MyArgs, true, <<"">>),
230+
?assertEqual(QEvenQuorumMustBeClassic, []),
231+
ok.

deps/rabbitmq_mqtt/test/reader_SUITE.erl

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212

1313
all() ->
1414
[
15-
{group, non_parallel_tests}
15+
{group, non_parallel_tests},
16+
{group, non_parallel_tests_quorum}
1617
].
1718

1819
groups() ->
@@ -21,7 +22,13 @@ groups() ->
2122
block,
2223
handle_invalid_frames,
2324
stats
24-
]}
25+
]},
26+
{non_parallel_tests_quorum, [], [
27+
quorum_session_false,
28+
quorum_session_true,
29+
classic_session_true,
30+
classic_session_false
31+
]}
2532
].
2633

2734
suite() ->
@@ -55,6 +62,14 @@ end_per_suite(Config) ->
5562
rabbit_ct_client_helpers:teardown_steps() ++
5663
rabbit_ct_broker_helpers:teardown_steps()).
5764

65+
init_per_group(non_parallel_tests_quorum, Config) ->
66+
%% Added for quorum queue test else the mixing test would fail
67+
%% with "feature flag is disabled"
68+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
69+
ok -> Config;
70+
Skip -> Skip
71+
end,
72+
Config;
5873
init_per_group(_, Config) ->
5974
Config.
6075

@@ -154,6 +169,64 @@ stats(Config) ->
154169
[connection_coarse_metrics, Pid]),
155170
emqttc:disconnect(C).
156171

172+
get_durable_queue_type(Server, Q0) ->
173+
QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
174+
{ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
175+
amqqueue:get_type(Q1).
176+
177+
set_env(QueueType) ->
178+
application:set_env(rabbitmq_mqtt, durable_queue_type, QueueType).
179+
180+
get_env() ->
181+
rabbit_mqtt_util:env(durable_queue_type).
182+
183+
184+
validate_durable_queue_type(Config, ClientName, CleanSession, Expected) ->
185+
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
186+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
187+
{ok, C} = emqttc:start_link([{host, "localhost"},
188+
{port, P},
189+
{clean_sess, CleanSession},
190+
{client_id, ClientName},
191+
{proto_ver, 3},
192+
{logger, info},
193+
{puback_timeout, 1}]),
194+
emqttc:subscribe(C, <<"TopicB">>, qos1),
195+
emqttc:publish(C, <<"TopicB">>, <<"Payload">>),
196+
expect_publishes(<<"TopicB">>, [<<"Payload">>]),
197+
emqttc:unsubscribe(C, [<<"TopicB">>]),
198+
Prefix = <<"mqtt-subscription-">>,
199+
Suffix = <<"qos1">>,
200+
Q= <<Prefix/binary, ClientName/binary, Suffix/binary>>,
201+
?assertEqual(Expected,get_durable_queue_type(Server,Q)),
202+
timer:sleep(500),
203+
emqttc:disconnect(C).
204+
205+
%% quorum queue test when enable
206+
quorum_session_false(Config) ->
207+
%% test if the quorum queue is enable after the setting
208+
Default = rpc(Config, reader_SUITE, get_env, []),
209+
rpc(Config, reader_SUITE, set_env, [quorum]),
210+
validate_durable_queue_type(Config, <<"qCleanSessionFalse">>, false, rabbit_quorum_queue),
211+
rpc(Config, reader_SUITE, set_env, [Default]).
212+
213+
quorum_session_true(Config) ->
214+
%% in case clean session == true must be classic since quorum
215+
%% doesn't support auto-delete
216+
Default = rpc(Config, reader_SUITE, get_env, []),
217+
rpc(Config, reader_SUITE, set_env, [quorum]),
218+
validate_durable_queue_type(Config, <<"qCleanSessionTrue">>, true, rabbit_classic_queue),
219+
rpc(Config, reader_SUITE, set_env, [Default]).
220+
221+
classic_session_true(Config) ->
222+
%% with default configuration the queue is classic
223+
validate_durable_queue_type(Config, <<"cCleanSessionTrue">>, true, rabbit_classic_queue).
224+
225+
classic_session_false(Config) ->
226+
%% with default configuration the queue is classic
227+
validate_durable_queue_type(Config, <<"cCleanSessionFalse">>, false, rabbit_classic_queue).
228+
229+
157230
expect_publishes(_Topic, []) -> ok;
158231
expect_publishes(Topic, [Payload|Rest]) ->
159232
receive

0 commit comments

Comments
 (0)