Skip to content

Commit 3390fc9

Browse files
committed
etcd peer discovery fixes
Instead of relying on the complex and non-determinstic default node selection mechanism inside peer discovery this change makes the etcd backend implemention make the leader selection itself based on the etcd create_revision of each entry. Although not spelled out anywhere explicitly is likely that a property called "Create Revision" is going to remain consistent throughout the lifetime of the etcd key. Either way this is likely to be an improvement on the current approach.
1 parent fde9950 commit 3390fc9

File tree

5 files changed

+38
-24
lines changed

5 files changed

+38
-24
lines changed

deps/rabbit/test/unit_quorum_queue_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
-module(unit_quorum_queue_SUITE).
22

3+
-compile(nowarn_export_all).
34
-compile(export_all).
45

56
all() ->

deps/rabbitmq_peer_discovery_etcd/src/rabbit_peer_discovery_etcd.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ list_nodes() ->
5959
{ok, {[], disc}}
6060
end,
6161
Fun2 = fun(_Proplist) ->
62-
%% error logging will be done by the client
63-
Nodes = rabbitmq_peer_discovery_etcd_v3_client:list_nodes(),
64-
{ok, {Nodes, disc}}
62+
%% nodes are returned sorted with the create_revision as
63+
%% the first element in the tuple.
64+
%% The node with the lowest create_revision is thus selected
65+
%% based on the assumption that the create_revision remains
66+
%% consistent throughout the lifetime of the etcd key.
67+
[{_, Node} | _] = rabbitmq_peer_discovery_etcd_v3_client:list_nodes(),
68+
{ok, {Node, disc}}
6569
end,
6670
rabbit_peer_discovery_util:maybe_backend_configured(?BACKEND_CONFIG_KEY, Fun0, Fun1, Fun2).
6771

deps/rabbitmq_peer_discovery_etcd/src/rabbitmq_peer_discovery_etcd_v3_client.erl

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,16 +230,13 @@ connected({call, From}, list_keys, Data = #statem_data{connection_name = Conn})
230230
rabbit_log:debug("etcd peer discovery: will use prefix ~ts to query for node keys", [Prefix]),
231231
{ok, #{kvs := Result}} = eetcd_kv:get(C2),
232232
rabbit_log:debug("etcd peer discovery returned keys: ~tp", [Result]),
233-
Values = [maps:get(value, M) || M <- Result],
234-
rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results", [length(Values)]),
235-
ParsedNodes = lists:map(fun extract_node/1, Values),
236-
{Successes, Failures} = lists:partition(fun filter_node/1, ParsedNodes),
237-
JoinedString = lists:join(",", [rabbit_data_coercion:to_list(Node) || Node <- lists:usort(Successes)]),
238-
rabbit_log:error("etcd peer discovery: successfully extracted nodes: ~ts", [JoinedString]),
239-
lists:foreach(fun(Val) ->
240-
rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~tp", [Val])
241-
end, Failures),
242-
gen_statem:reply(From, lists:usort(Successes)),
233+
Values = [{maps:get(create_revision, M), maps:get(value, M)} || M <- Result],
234+
rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results",
235+
[length(Values)]),
236+
ParsedNodes = lists:filtermap(fun extract_node/1, Values),
237+
rabbit_log:info("etcd peer discovery: successfully extracted nodes: ~0tp",
238+
[ParsedNodes]),
239+
gen_statem:reply(From, lists:usort(ParsedNodes)),
243240
keep_state_and_data.
244241

245242

@@ -298,15 +295,18 @@ registration_value(#statem_data{node_key_lease_id = LeaseID, node_key_ttl_in_sec
298295
<<"ttl">> => TTL
299296
})).
300297

301-
-spec extract_node(binary()) -> atom() | {error, any()}.
302-
303-
extract_node(Payload) ->
298+
extract_node({CreatedRev, Payload}) ->
304299
case rabbit_json:try_decode(Payload) of
305-
{error, Error} -> {error, Error};
300+
{error, _Error} ->
301+
rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~tp",
302+
[Payload]),
303+
false;
306304
{ok, Map} ->
307305
case maps:get(<<"node">>, Map, undefined) of
308-
undefined -> undefined;
309-
Node -> rabbit_data_coercion:to_atom(Node)
306+
undefined ->
307+
false;
308+
Node ->
309+
{true, {CreatedRev, rabbit_data_coercion:to_atom(Node)}}
310310
end
311311
end.
312312

deps/rabbitmq_peer_discovery_etcd/test/system_SUITE.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,12 @@ registration_with_locking_test(Config) ->
265265
?assertEqual(ok, rabbitmq_peer_discovery_etcd_v3_client:unlock(Pid, LockOwnerKey)),
266266

267267
Condition2 = fun() ->
268-
[node()] =:= rabbitmq_peer_discovery_etcd_v3_client:list_nodes(Pid)
268+
case rabbitmq_peer_discovery_etcd_v3_client:list_nodes(Pid) of
269+
[{_, N}] when N =:= node() ->
270+
true;
271+
_ ->
272+
false
273+
end
269274
end,
270275
try
271276
rabbit_ct_helpers:await_condition(Condition2, 45000)

deps/rabbitmq_peer_discovery_etcd/test/unit_SUITE.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,14 @@ registration_value_test(_Config) ->
5555
extract_nodes_case1_test(_Config) ->
5656
Input = registration_value_of(8488283859587364900, 61),
5757
Expected = node(),
58-
59-
?assertEqual(Expected, rabbitmq_peer_discovery_etcd_v3_client:extract_node(Input)),
60-
61-
?assertEqual(undefined, rabbitmq_peer_discovery_etcd_v3_client:extract_node(<<"{}">>)).
58+
CreatedRev = ?LINE,
59+
?assertEqual({true, {CreatedRev, Expected}},
60+
rabbitmq_peer_discovery_etcd_v3_client:extract_node(
61+
{CreatedRev, Input})),
62+
63+
?assertEqual(false,
64+
rabbitmq_peer_discovery_etcd_v3_client:extract_node(
65+
{CreatedRev, <<"{}">>})).
6266

6367
filter_nodes_test(_Config) ->
6468
Input = [node(), undefined, undefined, {error, reason1}, {error, {another, reason}}],

0 commit comments

Comments
 (0)