Skip to content

Commit a826b57

Browse files
committed
rabbit_process: Move process liveness functions here from rabbit_mnesia
These functions extend the functionality of `erlang:is_process_alive/1` to take into account the node a process is running on and its cluster membership. These functions are moved away from `rabbit_mnesia` because we don't want `rabbit_mnesia` to be a central piece of RabbitMQ. Classic-mirrored-queue-related modules continue to use `rabbit_mnesia` functions, therefore relying on Mnesia, because they depend entirely on Mnesia anyway. They will go away at the same time as our use of Mnesia. So by keeping this code untouched, we avoid possible regressions.
1 parent 656d309 commit a826b57

File tree

6 files changed

+94
-10
lines changed

6 files changed

+94
-10
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ retry_wait(Q, F, E, RetriesLeft) ->
574574
{stopped, false} ->
575575
E({absent, Q, stopped});
576576
_ ->
577-
case rabbit_mnesia:is_process_alive(QPid) of
577+
case rabbit_process:is_process_alive(QPid) of
578578
true ->
579579
% rabbitmq-server#1682
580580
% The old check would have crashed here,
@@ -1814,7 +1814,7 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
18141814
false;
18151815
is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
18161816
Pid = amqqueue:get_pid(Q),
1817-
not rabbit_mnesia:is_process_alive(Pid).
1817+
not rabbit_process:is_process_alive(Pid).
18181818

18191819
-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean().
18201820
has_synchronised_mirrors_online(Q) ->
@@ -1881,7 +1881,7 @@ on_node_down(Node) ->
18811881
filter_transient_queues_to_delete(Node) ->
18821882
fun(Q) ->
18831883
amqqueue:qnode(Q) == Node andalso
1884-
not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))
1884+
not rabbit_process:is_process_alive(amqqueue:get_pid(Q))
18851885
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
18861886
andalso (not rabbit_amqqueue:is_replicated(Q)
18871887
orelse rabbit_amqqueue:is_dead_exclusive(Q))

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ is_recoverable(Q) when ?is_amqqueue(Q) ->
129129
%% the policy). Thus, we must check if the local pid is alive
130130
%% - if the record is present - in order to restart.
131131
(not rabbit_db_queue:consistent_exists(amqqueue:get_name(Q))
132-
orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))).
132+
orelse not rabbit_process:is_process_alive(amqqueue:get_pid(Q))).
133133

134134
recover(VHost, Queues) ->
135135
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@@ -438,11 +438,11 @@ wait_for_promoted_or_stopped(Q0) ->
438438
{ok, Q} ->
439439
QPid = amqqueue:get_pid(Q),
440440
SPids = amqqueue:get_slave_pids(Q),
441-
case rabbit_mnesia:is_process_alive(QPid) of
441+
case rabbit_process:is_process_alive(QPid) of
442442
true -> {promoted, Q};
443443
false ->
444444
case lists:any(fun(Pid) ->
445-
rabbit_mnesia:is_process_alive(Pid)
445+
rabbit_process:is_process_alive(Pid)
446446
end, SPids) of
447447
%% There is a live slave. May be promoted
448448
true ->

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@
5656
%% Used internally in rpc calls
5757
-export([node_info/0, remove_node_if_mnesia_running/1]).
5858

59+
-deprecated({on_running_node, 1,
60+
"Use rabbit_process:on_running_node/1 instead"}).
61+
-deprecated({is_process_alive, 1,
62+
"Use rabbit_process:is_process_alive/1 instead"}).
63+
-deprecated({is_registered_process_alive, 1,
64+
"Use rabbit_process:is_registered_process_alive/1 instead"}).
65+
5966
-ifdef(TEST).
6067
-compile(export_all).
6168
-export([init_with_lock/3]).

deps/rabbit/src/rabbit_prequeue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ init(Q0, restart) when ?is_amqqueue(Q0) ->
5555
QPid = amqqueue:get_pid(Q1),
5656
SPids = amqqueue:get_slave_pids(Q1),
5757
LocalOrMasterDown = node(QPid) =:= node()
58-
orelse not rabbit_mnesia:on_running_node(QPid),
59-
Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
60-
case rabbit_mnesia:is_process_alive(QPid) of
58+
orelse not rabbit_process:on_running_node(QPid),
59+
Slaves = [SPid || SPid <- SPids, rabbit_process:is_process_alive(SPid)],
60+
case rabbit_process:is_process_alive(QPid) of
6161
true -> false = LocalOrMasterDown, %% assertion
6262
rabbit_mirror_queue_slave:go(self(), async),
6363
rabbit_mirror_queue_slave:init(Q1); %% [1]

deps/rabbit/src/rabbit_process.erl

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_process).
9+
10+
-export([on_running_node/1,
11+
is_process_alive/1,
12+
is_registered_process_alive/1]).
13+
14+
-spec on_running_node(Pid) -> OnRunningNode when
15+
Pid :: pid(),
16+
OnRunningNode :: boolean().
17+
%% @doc Indicates if the specified process runs on a member of the cluster.
18+
%%
19+
%% @param Pid the PID of the process to check
20+
%% @returns true if the process runs on one of the cluster members, false
21+
%% otherwise.
22+
23+
on_running_node(Pid) ->
24+
Node = node(Pid),
25+
rabbit_nodes:is_running(Node).
26+
27+
%% This requires the process be in the same running cluster as us
28+
%% (i.e. not partitioned or some random node).
29+
%%
30+
%% See also rabbit_misc:is_process_alive/1 which does not.
31+
32+
-spec is_process_alive(Pid) -> IsAlive when
33+
Pid :: pid() | {RegisteredName, Node},
34+
RegisteredName :: atom(),
35+
Node :: node(),
36+
IsAlive :: boolean().
37+
%% @doc Indicates if the specified process is alive.
38+
%%
39+
%% Unlike {@link erlang:is_process_alive/1}, this function works with remote
40+
%% processes and registered processes. However, the process must run on one of
41+
%% the cluster members.
42+
%%
43+
%% @param Pid the PID or name of the process to check
44+
%% @returns true if the process is alive runs on one of the cluster members,
45+
%% false otherwise.
46+
47+
is_process_alive(Pid) when is_pid(Pid) ->
48+
on_running_node(Pid)
49+
andalso
50+
rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true;
51+
is_process_alive({Name, Node}) when is_atom(Name) andalso is_atom(Node) ->
52+
case rabbit_nodes:is_running(Node) of
53+
true ->
54+
try
55+
erpc:call(Node, ?MODULE, is_registered_process_alive, [Name])
56+
catch
57+
error:{exception, undef, [{?MODULE, _, _, _} | _]} ->
58+
rpc:call(
59+
Node,
60+
rabbit_mnesia, is_registered_process_alive, [Name])
61+
end;
62+
false ->
63+
false
64+
end.
65+
66+
-spec is_registered_process_alive(RegisteredName) -> IsAlive when
67+
RegisteredName :: atom(),
68+
IsAlive :: boolean().
69+
%% @doc Indicates if the specified registered process is alive.
70+
%%
71+
%% The process must be local to this node.
72+
%%
73+
%% @param RegisteredName the name of the registered process
74+
%% @returns true if the process is alive, false otherwise.
75+
76+
is_registered_process_alive(Name) ->
77+
is_pid(whereis(Name)).

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ ntoab(IP) ->
881881
%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
882882
%% would be bad news.
883883
%%
884-
%% See also rabbit_mnesia:is_process_alive/1 which also requires the
884+
%% See also rabbit_process:is_process_alive/1 which also requires the
885885
%% process be in the same running cluster as us (i.e. not partitioned
886886
%% or some random node).
887887
is_process_alive(Pid) when node(Pid) =:= node() ->

0 commit comments

Comments
 (0)