Skip to content

Commit d782fe5

Browse files
committed
rabbitmq_peer_discovery_consul: Select the node to join
[Why] The default node selection of the peer discovery subsystem doesn't work well with Consul. The reason is that that selection is based on the nodes' uptime. However, the node with the highest uptime may not be the first to register in Consul. When this happens, the node that registered first will only discover itself and boot as a standalone node. Then, the node with the highest uptime will discover both of them, but will select itself as the node to join because of its uptime. In the end, we end up with two clusters instead of one. [How] We use the `CreateIndex` property in the Consul response to sort services. We then derive the name of the node to join after the service that has the lower `CreateIndex`, meaning it was the first to register. (cherry picked from commit 0f054e1)
1 parent cc8ec5d commit d782fe5

File tree

3 files changed

+29
-17
lines changed

3 files changed

+29
-17
lines changed

deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ list_nodes() ->
8383
HttpOpts) of
8484
{ok, Nodes} ->
8585
IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
86-
Result = extract_nodes(
87-
filter_nodes(Nodes, IncludeWithWarnings)),
86+
Result = extract_node(
87+
sort_nodes(
88+
filter_nodes(Nodes, IncludeWithWarnings))),
8889
{ok, {Result, disc}};
8990
{error, _} = Error ->
9091
Error
@@ -276,13 +277,24 @@ filter_nodes(Nodes, Warn) ->
276277
false -> Nodes
277278
end.
278279

279-
-spec extract_nodes(ConsulResult :: [#{binary() => term()}]) -> list().
280-
extract_nodes(Data) -> extract_nodes(Data, []).
281-
282-
-spec extract_nodes(ConsulResult :: [#{binary() => term()}], Nodes :: list())
283-
-> list().
284-
extract_nodes([], Nodes) -> Nodes;
285-
extract_nodes([H | T], Nodes) ->
280+
-spec sort_nodes(ConsulResult :: [#{binary() => term()}]) -> [#{binary() => term()}].
281+
sort_nodes(Nodes) ->
282+
lists:sort(
283+
fun(NodeA, NodeB) ->
284+
IndexA = maps:get(
285+
<<"CreateIndex">>,
286+
maps:get(<<"Service">>, NodeA, #{}), undefined),
287+
IndexB = maps:get(
288+
<<"CreateIndex">>,
289+
maps:get(<<"Service">>, NodeB, #{}), undefined),
290+
%% `undefined' is always greater than an integer, so we are fine here.
291+
IndexA =< IndexB
292+
end, Nodes).
293+
294+
-spec extract_node(ConsulResult :: [#{binary() => term()}]) -> list().
295+
extract_node([]) ->
296+
[];
297+
extract_node([H | _]) ->
286298
Service = maps:get(<<"Service">>, H),
287299
Meta = maps:get(<<"Meta">>, Service, #{}),
288300
NodeName = case Meta of
@@ -299,7 +311,7 @@ extract_nodes([H | T], Nodes) ->
299311
?UTIL_MODULE:node_name(Address)
300312
end
301313
end,
302-
extract_nodes(T, lists:merge(Nodes, [NodeName])).
314+
NodeName.
303315

304316
-spec maybe_add_acl(QArgs :: list()) -> list().
305317
maybe_add_acl(List) ->

deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ list_nodes_return_value_basic_test(_Config) ->
360360
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
361361
end),
362362
meck:expect(rabbit_nodes, name_type, fun() -> shortnames end),
363-
?assertEqual({ok, {['rabbit@rabbit1', 'rabbit@rabbit2'], disc}},
363+
?assertEqual({ok, {'rabbit@rabbit2', disc}},
364364
rabbit_peer_discovery_consul:list_nodes()),
365365
?assert(meck:validate(rabbit_peer_discovery_httpc)).
366366

@@ -388,7 +388,7 @@ list_nodes_return_value_basic_long_node_name_test(_Config) ->
388388
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
389389
end),
390390
meck:expect(rabbit_nodes, name_type, fun() -> longnames end),
391-
?assertEqual({ok, {['rabbit@rabbit1.node.consul', 'rabbit@rabbit2.node.consul'], disc}},
391+
?assertEqual({ok, {'[email protected]', disc}},
392392
rabbit_peer_discovery_consul:list_nodes()),
393393
?assert(meck:validate(rabbit_peer_discovery_httpc)).
394394

@@ -419,7 +419,7 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) ->
419419

420420

421421
meck:expect(rabbit_nodes, name_type, fun() -> longnames end),
422-
?assertEqual({ok, {['rabbit@rabbit1.node.internal', 'rabbit@rabbit2.node.internal'], disc}},
422+
?assertEqual({ok, {'[email protected]', disc}},
423423
rabbit_peer_discovery_consul:list_nodes()),
424424
?assert(meck:validate(rabbit_peer_discovery_httpc)).
425425

@@ -446,7 +446,7 @@ list_nodes_return_value_srv_address_test(_Config) ->
446446
Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq:172.172.16.4.50\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.16.4.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.16.4.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.172.16.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.172.16.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
447447
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
448448
end),
449-
?assertEqual({ok, {['[email protected]', '[email protected]'], disc}},
449+
?assertEqual({ok, {'[email protected]', disc}},
450450
rabbit_peer_discovery_consul:list_nodes()),
451451
?assert(meck:validate(rabbit_peer_discovery_httpc)).
452452

@@ -475,7 +475,7 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) ->
475475
rabbit_json:try_decode(list_of_nodes_without_warnings())
476476
end),
477477
os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "true"),
478-
?assertEqual({ok, {['[email protected]'], disc}},
478+
?assertEqual({ok, {'[email protected]', disc}},
479479
rabbit_peer_discovery_consul:list_nodes()),
480480
?assert(meck:validate(rabbit_peer_discovery_httpc)).
481481

@@ -504,7 +504,7 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) ->
504504
rabbit_json:try_decode(list_of_nodes_without_warnings())
505505
end),
506506
os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "false"),
507-
?assertEqual({ok, {['[email protected]', '[email protected]'], disc}},
507+
?assertEqual({ok, {'[email protected]', disc}},
508508
rabbit_peer_discovery_consul:list_nodes()),
509509
?assert(meck:validate(rabbit_peer_discovery_httpc)).
510510

deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/consul.hcl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ connect {
2323

2424
# Addresses and ports
2525
client_addr = "0.0.0.0"
26-
bind_addr = "0.0.0.0"
26+
bind_addr = "{{ GetInterfaceIP \"eth0\" }}"
2727

2828
addresses {
2929
grpc = "0.0.0.0"

0 commit comments

Comments
 (0)