Skip to content

Commit 787dca0

Browse files
committed
Use integer as will message correlation
Instead of using atom `undefined`, use an integer as correlation term when sending the will message to destination queues. Classic queue clients for example expect a non negative integer. Quorum queues expect any term. (cherry picked from commit dfc2ee6)
1 parent ca72c04 commit 787dca0

File tree

2 files changed

+44
-39
lines changed

2 files changed

+44
-39
lines changed

deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
%% Packet identifier is a non zero two byte integer.
6060
-type packet_id() :: 1..16#ffff.
6161

62+
%% Defining a single correlation term (sequence number) for the will message is
63+
%% sufficient because there can be only a single will message per MQTT session.
64+
%% To prevent clashes with a Packet ID and given Packet IDs must be non-zero, we choose 0.
65+
-define(WILL_MSG_QOS_1_CORRELATION, 0).
66+
6267
-record(mqtt_packet_fixed, {type = 0 :: packet_type(),
6368
dup = false :: boolean(),
6469
qos = 0 :: qos(),
@@ -100,12 +105,13 @@
100105
-record(mqtt_packet_suback, {packet_id :: packet_id(),
101106
qos_table = []}).
102107

103-
-record(mqtt_msg, {retain :: boolean(),
104-
qos :: qos(),
105-
topic :: binary(),
106-
dup :: boolean(),
107-
packet_id :: option(packet_id()),
108-
payload :: binary()}).
108+
%% MQTT application message.
109+
-record(mqtt_msg, {retain :: boolean(),
110+
qos :: qos(),
111+
topic :: binary(),
112+
dup :: boolean(),
113+
packet_id :: option(packet_id()) | ?WILL_MSG_QOS_1_CORRELATION,
114+
payload :: binary()}).
109115

110116
-type mqtt_msg() :: #mqtt_msg{}.
111117

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -246,15 +246,6 @@ process_request(?PUBACK,
246246
{ok, State}
247247
end;
248248

249-
process_request(?PUBLISH,
250-
Packet = #mqtt_packet{
251-
fixed = Fixed = #mqtt_packet_fixed{qos = ?QOS_2}},
252-
State) ->
253-
% Downgrade QOS_2 to QOS_1
254-
process_request(?PUBLISH,
255-
Packet#mqtt_packet{
256-
fixed = Fixed#mqtt_packet_fixed{qos = ?QOS_1}},
257-
State);
258249
process_request(?PUBLISH,
259250
#mqtt_packet{
260251
fixed = #mqtt_packet_fixed{qos = Qos,
@@ -266,11 +257,12 @@ process_request(?PUBLISH,
266257
State0 = #state{unacked_client_pubs = U,
267258
cfg = #cfg{retainer_pid = RPid,
268259
proto_ver = ProtoVer}}) ->
260+
EffectiveQos = maybe_downgrade_qos(Qos),
269261
rabbit_global_counters:messages_received(ProtoVer, 1),
270262
State = maybe_increment_publisher(State0),
271263
Publish = fun() ->
272264
Msg = #mqtt_msg{retain = Retain,
273-
qos = Qos,
265+
qos = EffectiveQos,
274266
topic = Topic,
275267
dup = Dup,
276268
packet_id = PacketId,
@@ -288,8 +280,10 @@ process_request(?PUBLISH,
288280
Error
289281
end
290282
end,
291-
case Qos of
292-
N when N > ?QOS_0 ->
283+
case EffectiveQos of
284+
?QOS_0 ->
285+
publish_to_queues_with_checks(Topic, Publish, State);
286+
?QOS_1 ->
293287
rabbit_global_counters:messages_received_confirm(ProtoVer, 1),
294288
case rabbit_mqtt_confirms:contains(PacketId, U) of
295289
false ->
@@ -299,9 +293,7 @@ process_request(?PUBLISH,
299293
%% We already sent this message to target queues awaiting confirmations.
300294
%% Hence, we ignore this re-send.
301295
{ok, State}
302-
end;
303-
_ ->
304-
publish_to_queues_with_checks(Topic, Publish, State)
296+
end
305297
end;
306298

307299
process_request(?SUBSCRIBE,
@@ -322,7 +314,7 @@ process_request(?SUBSCRIBE,
322314
(#mqtt_topic{name = TopicName,
323315
qos = TopicQos},
324316
{L, S0}) ->
325-
QoS = supported_sub_qos(TopicQos),
317+
QoS = maybe_downgrade_qos(TopicQos),
326318
maybe
327319
ok ?= maybe_replace_old_sub(TopicName, QoS, S0),
328320
{ok, Q} ?= ensure_queue(QoS, S0),
@@ -663,16 +655,23 @@ maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}
663655
State
664656
end.
665657

666-
make_will_msg(#mqtt_packet_connect{will_flag = false}) ->
658+
make_will_msg(#mqtt_packet_connect{will_flag = false}) ->
667659
undefined;
668-
make_will_msg(#mqtt_packet_connect{will_retain = Retain,
669-
will_qos = Qos,
670-
will_topic = Topic,
671-
will_msg = Msg}) ->
672-
#mqtt_msg{retain = Retain,
673-
qos = Qos,
674-
topic = Topic,
675-
dup = false,
660+
make_will_msg(#mqtt_packet_connect{will_flag = true,
661+
will_retain = Retain,
662+
will_qos = Qos,
663+
will_topic = Topic,
664+
will_msg = Msg}) ->
665+
EffectiveQos = maybe_downgrade_qos(Qos),
666+
Correlation = case EffectiveQos of
667+
?QOS_0 -> undefined;
668+
?QOS_1 -> ?WILL_MSG_QOS_1_CORRELATION
669+
end,
670+
#mqtt_msg{retain = Retain,
671+
qos = EffectiveQos,
672+
packet_id = Correlation,
673+
topic = Topic,
674+
dup = false,
676675
payload = Msg}.
677676

678677
check_vhost_exists(VHost, Username, PeerIp) ->
@@ -885,14 +884,14 @@ creds(User, Pass, SSLLoginName) ->
885884
auth_attempt_failed(PeerIp, Username) ->
886885
rabbit_core_metrics:auth_attempt_failed(PeerIp, Username, mqtt).
887886

888-
supported_sub_qos(?QOS_0) -> ?QOS_0;
889-
supported_sub_qos(?QOS_1) -> ?QOS_1;
890-
supported_sub_qos(?QOS_2) -> ?QOS_1.
891-
892887
delivery_mode(?QOS_0) -> 1;
893888
delivery_mode(?QOS_1) -> 2;
894889
delivery_mode(?QOS_2) -> 2.
895890

891+
maybe_downgrade_qos(?QOS_0) -> ?QOS_0;
892+
maybe_downgrade_qos(?QOS_1) -> ?QOS_1;
893+
maybe_downgrade_qos(?QOS_2) -> ?QOS_1.
894+
896895
ensure_queue(QoS, State = #state{auth_state = #auth_state{user = #user{username = Username}}}) ->
897896
case get_queue(QoS, State) of
898897
{ok, Q} ->
@@ -1155,9 +1154,9 @@ process_routing_confirm(#delivery{confirm = false},
11551154
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
11561155
State;
11571156
process_routing_confirm(#delivery{confirm = true,
1158-
msg_seq_no = undefined},
1157+
msg_seq_no = ?WILL_MSG_QOS_1_CORRELATION},
11591158
[], State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
1160-
%% unroutable will message with QoS > 0
1159+
%% unroutable will message with QoS 1
11611160
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
11621161
State;
11631162
process_routing_confirm(#delivery{confirm = true,
@@ -1172,8 +1171,8 @@ process_routing_confirm(#delivery{confirm = true,
11721171
process_routing_confirm(#delivery{confirm = false}, _, State) ->
11731172
State;
11741173
process_routing_confirm(#delivery{confirm = true,
1175-
msg_seq_no = undefined}, [_|_], State) ->
1176-
%% routable will message with QoS > 0
1174+
msg_seq_no = ?WILL_MSG_QOS_1_CORRELATION}, [_|_], State) ->
1175+
%% routable will message with QoS 1
11771176
State;
11781177
process_routing_confirm(#delivery{confirm = true,
11791178
msg_seq_no = PktId},

0 commit comments

Comments
 (0)