Skip to content

Commit 83eede7

Browse files
committed
Keep storing MQTT client IDs as lists in Ra
Up to 3.11.x an MQTT client ID is tracked in Ra as a list of bytes as returned by binary_to_list/1 in https://github.com/rabbitmq/rabbitmq-server/blob/48467d6e1283b8d81e52cfd49c06ea4eaa31617d/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl#L137 This has two downsides: 1. Lists consume more memory than binaries (when tracking many clients). 2. It violates the MQTT spec which states "The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1] Therefore, the original idea was to always store MQTT client IDs as binaries starting with Native MQTT in 3.12. However, this leads to client ID tracking misbehaving in mixed version clusters since new nodes would register client IDs as binaries and old nodes would register client IDs as lists. This means that a client registering on a new node with the same client ID as a connection to the old node did not terminate the connection on the old node. Therefore, for backwards compatibility, we leave the client ID as a list of bytes in the Ra machine state because the feature flag delete_ra_cluster_mqtt_node introduced in v3.12 will delete the Ra cluster anyway and the new client ID tracking via pg local will store client IDs as binaries. An interesting side note learned here is that the compiled file rabbit_mqtt_collector must not be changed. This commit only modifies function specs. However as soon as the compiled code is changed, this module becomes a new version. The new version causes the anonymous ra query function to fail in mixed clusters: When the old node does a ra:leader_query where the leader is on the new node, the query function fails on the new node with `badfun` because the new node does not have the same module version. For more context, read: https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
1 parent 77aaeb1 commit 83eede7

File tree

8 files changed

+57
-35
lines changed

8 files changed

+57
-35
lines changed

deps/rabbitmq_mqtt/include/mqtt_machine.hrl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,21 @@
55
%% Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
66
%%
77

8+
%% A client ID that is tracked in Ra is a list of bytes
9+
%% as returned by binary_to_list/1 in
10+
%% https://github.com/rabbitmq/rabbitmq-server/blob/48467d6e1283b8d81e52cfd49c06ea4eaa31617d/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl#L137
11+
%% prior to 3.12.0.
12+
%% This has two downsides:
13+
%% 1. Lists consume more memory than binaries (when tracking many clients).
14+
%% 2. This violates the MQTT spec which states
15+
%% "The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1]
16+
%% However, for backwards compatibility, we leave the client ID as a list of bytes in the Ra machine state because
17+
%% feature flag delete_ra_cluster_mqtt_node introduced in 3.12.0 will delete the Ra cluster anyway.
18+
-type client_id() :: [byte()].
19+
820
-record(machine_state, {
9-
%% client ID to connection PID
10-
client_ids = #{},
11-
%% connection PID to list of client IDs
12-
pids = #{},
21+
client_ids = #{} :: #{client_id() => Connection :: pid()},
22+
pids = #{} :: #{Connection :: pid() => [client_id(), ...]},
1323
%% add acouple of fields for future extensibility
1424
reserved_1,
1525
reserved_2}).
16-

deps/rabbitmq_mqtt/src/mqtt_machine.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
-type config() :: map().
2323

2424
-type reply() :: {ok, term()} | {error, term()}.
25-
-type client_id() :: term().
2625

2726
-type command() :: {register, client_id(), pid()} |
2827
{unregister, client_id(), pid()} |

deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
list/0, list_pids/0, leave/1]).
1414

1515
%%----------------------------------------------------------------------------
16-
-spec register(term(), pid()) -> {ok, reference()} | {error, term()}.
16+
-spec register(client_id(), pid()) -> {ok, reference()} | {error, term()}.
1717
register(ClientId, Pid) ->
1818
{ClusterName, _} = NodeId = mqtt_node:server_id(),
1919
case ra_leaderboard:lookup_leader(ClusterName) of
@@ -28,15 +28,15 @@ register(ClientId, Pid) ->
2828
register(Leader, ClientId, Pid)
2929
end.
3030

31-
-spec register(ra:server_id(), term(), pid()) ->
31+
-spec register(ra:server_id(), client_id(), pid()) ->
3232
{ok, reference()} | {error, term()}.
3333
register(ServerId, ClientId, Pid) ->
3434
Corr = make_ref(),
3535
send_ra_command(ServerId, {register, ClientId, Pid}, Corr),
3636
erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}),
3737
{ok, Corr}.
3838

39-
-spec unregister(binary(), pid()) -> ok.
39+
-spec unregister(client_id(), pid()) -> ok.
4040
unregister(ClientId, Pid) ->
4141
{ClusterName, _} = mqtt_node:server_id(),
4242
case ra_leaderboard:lookup_leader(ClusterName) of

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ register_client_id(VHost, ClientId)
426426

427427
case rabbit_mqtt_ff:track_client_id_in_ra() of
428428
true ->
429-
case rabbit_mqtt_collector:register(ClientId, self()) of
429+
case collector_register(ClientId) of
430430
{ok, Corr} ->
431431
%% Ra node takes care of removing duplicate client ID connections.
432432
{ok, {pending, Corr}};
@@ -1228,7 +1228,7 @@ maybe_send_will(_, _, _) ->
12281228
unregister_client(#state{cfg = #cfg{client_id = ClientId}}) ->
12291229
case rabbit_mqtt_ff:track_client_id_in_ra() of
12301230
true ->
1231-
rabbit_mqtt_collector:unregister(ClientId, self());
1231+
rabbit_mqtt_collector:unregister(binary_to_list(ClientId), self());
12321232
false ->
12331233
ok
12341234
end.
@@ -1283,9 +1283,10 @@ handle_ra_event({applied, [{Corr, ok}]},
12831283
State#state{ra_register_state = registered};
12841284
handle_ra_event({not_leader, Leader, Corr},
12851285
State = #state{ra_register_state = {pending, Corr},
1286-
cfg = #cfg{client_id = ClientId}}) ->
1286+
cfg = #cfg{client_id = ClientIdBin}}) ->
12871287
case rabbit_mqtt_ff:track_client_id_in_ra() of
12881288
true ->
1289+
ClientId = binary_to_list(ClientIdBin),
12891290
%% retry command against actual leader
12901291
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
12911292
State#state{ra_register_state = {pending, NewCorr}};
@@ -1297,7 +1298,7 @@ handle_ra_event(register_timeout,
12971298
cfg = #cfg{client_id = ClientId}}) ->
12981299
case rabbit_mqtt_ff:track_client_id_in_ra() of
12991300
true ->
1300-
{ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
1301+
{ok, NewCorr} = collector_register(ClientId),
13011302
State#state{ra_register_state = {pending, NewCorr}};
13021303
false ->
13031304
State
@@ -1784,6 +1785,10 @@ message_redelivered(true, ProtoVer, QType) ->
17841785
message_redelivered(_, _, _) ->
17851786
ok.
17861787

1788+
collector_register(ClientIdBin) ->
1789+
ClientId = binary_to_list(ClientIdBin),
1790+
rabbit_mqtt_collector:register(ClientId, self()).
1791+
17871792
-spec format_status(state()) -> map().
17881793
format_status(
17891794
#state{queue_states = QState,

deps/rabbitmq_mqtt/test/cluster_SUITE.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
teardown_steps/0,
2020
get_node_config/3,
2121
rabbitmqctl/3,
22-
rpc/4,
22+
rpc/4, rpc/5,
2323
stop_node/2
2424
]).
2525

26+
-import(rabbit_ct_helpers,
27+
[eventually/3]).
28+
2629
-define(OPTS, [{connect_timeout, 1},
2730
{ack_timeout, 1}]).
2831

@@ -127,17 +130,15 @@ connection_id_tracking(Config) ->
127130
ok = emqtt:disconnect(C3).
128131

129132
connection_id_tracking_on_nodedown(Config) ->
130-
Server = get_node_config(Config, 0, nodename),
131133
C = connect(<<"simpleClient">>, Config, ?OPTS),
132134
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
133135
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
134136
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
135137
assert_connection_count(Config, 4, 1),
136138
process_flag(trap_exit, true),
137-
ok = stop_node(Config, Server),
139+
ok = stop_node(Config, 0),
138140
await_exit(C),
139-
assert_connection_count(Config, 4, 0),
140-
ok.
141+
ok = eventually(?_assertEqual([], util:all_connection_pids(1, Config)), 500, 4).
141142

142143
connection_id_tracking_with_decommissioned_node(Config) ->
143144
case rpc(Config, rabbit_mqtt_ff, track_client_id_in_ra, []) of

deps/rabbitmq_mqtt/test/ff_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,13 @@ delete_ra_cluster_mqtt_node(Config) ->
8383
rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)),
8484

8585
%% Ra processes should be gone
86-
rabbit_ct_helpers:eventually(
86+
eventually(
8787
?_assert(lists:all(fun(Pid) -> Pid =:= undefined end,
8888
rabbit_ct_broker_helpers:rpc_all(Config, erlang, whereis, [mqtt_node])))),
8989
%% new client ID tracking works
9090
?assertEqual(1, length(util:all_connection_pids(Config))),
91-
?assert(erlang:is_process_alive(C)),
92-
ok = emqtt:disconnect(C).
91+
ok = emqtt:disconnect(C),
92+
eventually(?_assertEqual(0, length(util:all_connection_pids(Config)))).
9393

9494
rabbit_mqtt_qos0_queue(Config) ->
9595
FeatureFlag = ?FUNCTION_NAME,

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ subgroups() ->
8787
,large_message_amqp_to_mqtt
8888
,keepalive
8989
,keepalive_turned_off
90-
,duplicate_client_id
9190
,block
9291
,amqp_to_mqtt_qos0
9392
,clean_session_disconnect_client
@@ -112,7 +111,8 @@ subgroups() ->
112111
maintenance,
113112
delete_create_queue,
114113
publish_to_all_queue_types_qos0,
115-
publish_to_all_queue_types_qos1
114+
publish_to_all_queue_types_qos1,
115+
duplicate_client_id
116116
]}
117117
].
118118

@@ -1143,17 +1143,19 @@ keepalive_turned_off(Config) ->
11431143
ok = emqtt:disconnect(C).
11441144

11451145
duplicate_client_id(Config) ->
1146+
[Server1, Server2, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
11461147
DuplicateClientId = ?FUNCTION_NAME,
1147-
C1 = connect(DuplicateClientId, Config),
1148+
%% Connect to old node in mixed version cluster.
1149+
C1 = connect(DuplicateClientId, Config, Server2, []),
11481150
eventually(?_assertEqual(1, length(all_connection_pids(Config)))),
1149-
11501151
process_flag(trap_exit, true),
1151-
C2 = connect(DuplicateClientId, Config),
1152+
%% Connect to new node in mixed version cluster.
1153+
C2 = connect(DuplicateClientId, Config, Server1, []),
11521154
await_exit(C1),
11531155
timer:sleep(200),
11541156
?assertEqual(1, length(all_connection_pids(Config))),
1155-
1156-
ok = emqtt:disconnect(C2).
1157+
ok = emqtt:disconnect(C2),
1158+
eventually(?_assertEqual(0, length(all_connection_pids(Config)))).
11571159

11581160
block(Config) ->
11591161
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),

deps/rabbitmq_mqtt/test/util.erl

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
-include_lib("eunit/include/eunit.hrl").
66

77
-export([all_connection_pids/1,
8+
all_connection_pids/2,
89
publish_qos1_timeout/4,
910
sync_publish_result/3,
1011
get_global_counters/2,
@@ -23,13 +24,18 @@
2324
]).
2425

2526
all_connection_pids(Config) ->
26-
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
27-
Result = erpc:multicall(Nodes, rabbit_mqtt, local_connection_pids, [], 5000),
28-
lists:foldl(fun({ok, Pids}, Acc) ->
29-
Pids ++ Acc;
30-
(_, Acc) ->
31-
Acc
32-
end, [], Result).
27+
all_connection_pids(0, Config).
28+
29+
all_connection_pids(Node, Config) ->
30+
case rabbit_ct_broker_helpers:rpc(
31+
Config, Node, rabbit_feature_flags, is_enabled, [delete_ra_cluster_mqtt_node]) of
32+
true ->
33+
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
34+
Result = erpc:multicall(Nodes, rabbit_mqtt, local_connection_pids, [], 5000),
35+
lists:append([Pids || {ok, Pids} <- Result]);
36+
false ->
37+
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_mqtt_collector, list_pids, [])
38+
end.
3339

3440
publish_qos1_timeout(Client, Topic, Payload, Timeout) ->
3541
Mref = erlang:monitor(process, Client),

0 commit comments

Comments
 (0)