Skip to content

Commit a56d82c

Browse files
committed
rabbitmq_peer_discovery_consul: Handle locking inside list_nodes/0
[Why] The new implementation of `rabbit_peer_discovery` acquires the lock only when a node needs to join another one. This is meant to disappear in the medium/long term anyway. Here, we need to lock the query to Consul to make sure that queries happen sequentially, not concurrently. This is a work in progress and we may not keep it either.
1 parent 27ed4d2 commit a56d82c

File tree

3 files changed

+96
-22
lines changed

3 files changed

+96
-22
lines changed

deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,30 @@ list_nodes() ->
6868
{ok, {[], disc}}
6969
end,
7070
Fun2 = fun(Proplist) ->
71-
M = maps:from_list(Proplist),
72-
Path = rabbit_peer_discovery_httpc:build_path([v1, health, service, service_name()]),
73-
HttpOpts = http_options(M),
74-
case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M),
75-
get_config_key(consul_host, M),
76-
get_integer_config_key(consul_port, M),
77-
Path,
78-
list_nodes_query_args(),
79-
maybe_add_acl([]),
80-
HttpOpts) of
81-
{ok, Nodes} ->
82-
IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
83-
Result = extract_nodes(
84-
filter_nodes(Nodes, IncludeWithWarnings)),
85-
{ok, {Result, disc}};
71+
case internal_lock() of
72+
{ok, Priv} ->
73+
try
74+
M = maps:from_list(Proplist),
75+
Path = rabbit_peer_discovery_httpc:build_path([v1, health, service, service_name()]),
76+
HttpOpts = http_options(M),
77+
case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M),
78+
get_config_key(consul_host, M),
79+
get_integer_config_key(consul_port, M),
80+
Path,
81+
list_nodes_query_args(),
82+
maybe_add_acl([]),
83+
HttpOpts) of
84+
{ok, Nodes} ->
85+
IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
86+
Result = extract_nodes(
87+
filter_nodes(Nodes, IncludeWithWarnings)),
88+
{ok, {Result, disc}};
89+
{error, _} = Error ->
90+
Error
91+
end
92+
after
93+
internal_unlock(Priv)
94+
end;
8695
{error, _} = Error ->
8796
Error
8897
end
@@ -164,9 +173,20 @@ post_registration() ->
164173
ok.
165174

166175
-spec lock(Nodes :: [node()]) ->
167-
{ok, Data :: term()} | {error, Reason :: string()}.
176+
not_supported.
168177

169178
lock(_Nodes) ->
179+
not_supported.
180+
181+
-spec unlock(Data :: term()) -> ok.
182+
183+
unlock(_Data) ->
184+
ok.
185+
186+
-spec internal_lock() ->
187+
{ok, Data :: term()} | {error, Reason :: string()}.
188+
189+
internal_lock() ->
170190
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
171191
?LOG_DEBUG(
172192
"Effective Consul peer discovery configuration: ~tp", [M],
@@ -179,13 +199,13 @@ lock(_Nodes) ->
179199
EndTime = Now + get_config_key(lock_wait_time, M),
180200
lock(TRef, SessionId, Now, EndTime);
181201
{error, Reason} ->
182-
{error, lists:flatten(io_lib:format("Error while creating a session, reason: ~ts",
202+
{error, lists:flatten(io_lib:format("Error while creating a session, reason: ~0p",
183203
[Reason]))}
184204
end.
185205

186-
-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
206+
-spec internal_unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
187207

188-
unlock({SessionId, TRef}) ->
208+
internal_unlock({SessionId, TRef}) ->
189209
_ = timer:cancel(TRef),
190210
?LOG_DEBUG(
191211
"Stopped session renewal",
@@ -620,7 +640,7 @@ wait_for_list_nodes(N) ->
620640
%% Create a session to be acquired for a common key
621641
%% @end
622642
%%--------------------------------------------------------------------
623-
-spec create_session(atom(), pos_integer()) -> {ok, string()} | {error, Reason::string()}.
643+
-spec create_session(atom(), pos_integer()) -> {ok, string()} | {error, Reason::any()}.
624644
create_session(Name, TTL) ->
625645
case consul_session_create([], maybe_add_acl([]),
626646
[{'Name', Name},
@@ -705,7 +725,7 @@ start_session_ttl_updater(SessionId) ->
705725
%% Tries to acquire lock. If the lock is held by someone else, waits until it
706726
%% is released, or too much time has passed
707727
%% @end
708-
-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}.
728+
-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, {SessionId :: string(), TRef :: timer:tref()}} | {error, string()}.
709729
lock(TRef, _, Now, EndTime) when EndTime < Now ->
710730
_ = timer:cancel(TRef),
711731
{error, "Acquiring lock taking too long, bailing out"};

deps/rabbitmq_peer_discovery_consul/src/rabbitmq_peer_discovery_consul.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ unregister() ->
4242
post_registration() ->
4343
?DELEGATE:post_registration().
4444

45-
-spec lock(Nodes :: [node()]) -> {ok, Data :: term()} | {error, Reason :: string()}.
45+
-spec lock(Nodes :: [node()]) -> not_supported.
4646
lock(Node) ->
4747
?DELEGATE:lock(Node).
4848

deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,15 @@ list_nodes_return_value_basic_test(_Config) ->
345345
{consul_port, 8500}
346346
]}
347347
]),
348+
meck:expect(rabbit_peer_discovery_httpc, put,
349+
fun
350+
(_, _, _, "v1/session/create", _, _, _, _) ->
351+
Body = "{\"ID\":\"some-session-id\"}",
352+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body));
353+
(_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) ->
354+
Body = "true",
355+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
356+
end),
348357
meck:expect(rabbit_peer_discovery_httpc, get,
349358
fun(_, _, _, _, _, _, _) ->
350359
Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
@@ -364,6 +373,15 @@ list_nodes_return_value_basic_long_node_name_test(_Config) ->
364373
{consul_port, 8500}
365374
]}
366375
]),
376+
meck:expect(rabbit_peer_discovery_httpc, put,
377+
fun
378+
(_, _, _, "v1/session/create", _, _, _, _) ->
379+
Body = "{\"ID\":\"some-session-id\"}",
380+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body));
381+
(_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) ->
382+
Body = "true",
383+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
384+
end),
367385
meck:expect(rabbit_peer_discovery_httpc, get,
368386
fun(_, _, _, _, _, _, _) ->
369387
Body = "[{\"Node\": {\"Node\": \"rabbit2\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit2\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
@@ -384,6 +402,15 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) ->
384402
{consul_domain, "internal"}
385403
]}
386404
]),
405+
meck:expect(rabbit_peer_discovery_httpc, put,
406+
fun
407+
(_, _, _, "v1/session/create", _, _, _, _) ->
408+
Body = "{\"ID\":\"some-session-id\"}",
409+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body));
410+
(_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) ->
411+
Body = "true",
412+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
413+
end),
387414
meck:expect(rabbit_peer_discovery_httpc, get,
388415
fun(_, _, _, _, _, _, _) ->
389416
Body = "[{\"Node\": {\"Node\": \"rabbit2\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit2\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
@@ -405,6 +432,15 @@ list_nodes_return_value_srv_address_test(_Config) ->
405432
{consul_port, 8500}
406433
]}
407434
]),
435+
meck:expect(rabbit_peer_discovery_httpc, put,
436+
fun
437+
(_, _, _, "v1/session/create", _, _, _, _) ->
438+
Body = "{\"ID\":\"some-session-id\"}",
439+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body));
440+
(_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) ->
441+
Body = "true",
442+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
443+
end),
408444
meck:expect(rabbit_peer_discovery_httpc, get,
409445
fun(_, _, _, _, _, _, _) ->
410446
Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq:172.172.16.4.50\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.16.4.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.16.4.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.172.16.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.172.16.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
@@ -423,6 +459,15 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) ->
423459
{consul_port, 8500}
424460
]}
425461
]),
462+
meck:expect(rabbit_peer_discovery_httpc, put,
463+
fun
464+
(_, _, _, "v1/session/create", _, _, _, _) ->
465+
Body = "{\"ID\":\"some-session-id\"}",
466+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body));
467+
(_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) ->
468+
Body = "true",
469+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
470+
end),
426471
meck:expect(rabbit_peer_discovery_httpc, get,
427472
fun(_, _, _, _, [], _, _) ->
428473
rabbit_json:try_decode(list_of_nodes_with_warnings());
@@ -443,6 +488,15 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) ->
443488
{consul_port, 8500}
444489
]}
445490
]),
491+
meck:expect(rabbit_peer_discovery_httpc, put,
492+
fun
493+
(_, _, _, "v1/session/create", _, _, _, _) ->
494+
Body = "{\"ID\":\"some-session-id\"}",
495+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body));
496+
(_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) ->
497+
Body = "true",
498+
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
499+
end),
446500
meck:expect(rabbit_peer_discovery_httpc, get,
447501
fun(_, _, _, _, [], _, _) ->
448502
rabbit_json:try_decode(list_of_nodes_with_warnings());

0 commit comments

Comments
 (0)