Skip to content

Commit 8f71523

Browse files
Merge pull request #1700 from rabbitmq/rabbitmq-server-1699
Make pg_local:member_died/2 more resilient
2 parents 3d08081 + 7a0c8b6 commit 8f71523

File tree

2 files changed

+102
-31
lines changed

2 files changed

+102
-31
lines changed

src/pg_local.erl

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
-module(pg_local).
3636

3737
-export([join/2, leave/2, get_members/1, in_group/2]).
38-
-export([sync/0]). %% intended for testing only; not part of official API
38+
%% intended for testing only; not part of official API
39+
-export([sync/0, clear/0]).
3940
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
4041
handle_info/2, terminate/2]).
4142

@@ -54,7 +55,7 @@
5455

5556
%%----------------------------------------------------------------------------
5657

57-
%%% As of R13B03 monitors are used instead of links.
58+
-define(TABLE, pg_local_table).
5859

5960
%%%
6061
%%% Exported functions
@@ -92,19 +93,27 @@ sync() ->
9293
_ = ensure_started(),
9394
gen_server:call(?MODULE, sync, infinity).
9495

96+
clear() ->
97+
_ = ensure_started(),
98+
gen_server:call(?MODULE, clear, infinity).
99+
95100
%%%
96101
%%% Callback functions from gen_server
97102
%%%
98103

99104
-record(state, {}).
100105

101106
init([]) ->
102-
pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
107+
?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]),
103108
{ok, #state{}}.
104109

105110
handle_call(sync, _From, S) ->
106111
{reply, ok, S};
107112

113+
handle_call(clear, _From, S) ->
114+
ets:delete_all_objects(?TABLE),
115+
{reply, ok, S};
116+
108117
handle_call(Request, From, S) ->
109118
error_logger:warning_msg("The pg_local server received an unexpected message:\n"
110119
"handle_call(~p, ~p, _)\n",
@@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) ->
120129
handle_cast(_, S) ->
121130
{noreply, S}.
122131

123-
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
124-
member_died(MonitorRef),
132+
handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) ->
133+
member_died(MonitorRef, Pid),
125134
{noreply, S};
126135
handle_info(_, S) ->
127136
{noreply, S}.
128137

129138
terminate(_Reason, _S) ->
130-
true = ets:delete(pg_local_table),
139+
true = ets:delete(?TABLE),
131140
ok.
132141

133142
%%%
@@ -148,46 +157,55 @@ terminate(_Reason, _S) ->
148157
%%% {{pid, Pid, Name}}
149158
%%% Pid is a member of group Name.
150159

151-
member_died(Ref) ->
152-
[{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
160+
member_died(Ref, Pid) ->
161+
case ets:lookup(?TABLE, {ref, Ref}) of
162+
[{{ref, Ref}, Pid}] ->
163+
leave_all_groups(Pid);
164+
%% in case the key has already been removed
165+
%% we can clean up using the value from the DOWN message
166+
_ ->
167+
leave_all_groups(Pid)
168+
end,
169+
ok.
170+
171+
leave_all_groups(Pid) ->
153172
Names = member_groups(Pid),
154173
_ = [leave_group(Name, P) ||
155174
Name <- Names,
156-
P <- member_in_group(Pid, Name)],
157-
ok.
175+
P <- member_in_group(Pid, Name)].
158176

159177
join_group(Name, Pid) ->
160178
Ref_Pid = {ref, Pid},
161-
try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
179+
try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1})
162180
catch _:_ ->
163181
Ref = erlang:monitor(process, Pid),
164-
true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
165-
true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
182+
true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}),
183+
true = ets:insert(?TABLE, {{ref, Ref}, Pid})
166184
end,
167185
Member_Name_Pid = {member, Name, Pid},
168-
try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
186+
try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1})
169187
catch _:_ ->
170-
true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
171-
true = ets:insert(pg_local_table, {{pid, Pid, Name}})
188+
true = ets:insert(?TABLE, {Member_Name_Pid, 1}),
189+
true = ets:insert(?TABLE, {{pid, Pid, Name}})
172190
end.
173191

174192
leave_group(Name, Pid) ->
175193
Member_Name_Pid = {member, Name, Pid},
176-
try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
194+
try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of
177195
N ->
178196
if
179197
N =:= 0 ->
180-
true = ets:delete(pg_local_table, {pid, Pid, Name}),
181-
true = ets:delete(pg_local_table, Member_Name_Pid);
198+
true = ets:delete(?TABLE, {pid, Pid, Name}),
199+
true = ets:delete(?TABLE, Member_Name_Pid);
182200
true ->
183201
ok
184202
end,
185203
Ref_Pid = {ref, Pid},
186-
case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
204+
case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of
187205
0 ->
188-
[{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
189-
true = ets:delete(pg_local_table, {ref, Ref}),
190-
true = ets:delete(pg_local_table, Ref_Pid),
206+
[{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid),
207+
true = ets:delete(?TABLE, {ref, Ref}),
208+
true = ets:delete(?TABLE, Ref_Pid),
191209
true = erlang:demonitor(Ref, [flush]),
192210
ok;
193211
_ ->
@@ -199,21 +217,21 @@ leave_group(Name, Pid) ->
199217

200218
group_members(Name) ->
201219
[P ||
202-
[P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
220+
[P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}),
203221
_ <- lists:seq(1, N)].
204222

205223
member_in_group(Pid, Name) ->
206-
[{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
224+
[{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}),
207225
lists:duplicate(N, Pid).
208226

209227
member_present(Name, Pid) ->
210-
case ets:lookup(pg_local_table, {member, Name, Pid}) of
228+
case ets:lookup(?TABLE, {member, Name, Pid}) of
211229
[_] -> true;
212230
[] -> false
213231
end.
214232

215233
member_groups(Pid) ->
216-
[Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
234+
[Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})].
217235

218236
ensure_started() ->
219237
case whereis(?MODULE) of

test/unit_SUITE.erl

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
-module(unit_SUITE).
1818

1919
-include_lib("common_test/include/ct.hrl").
20+
-include_lib("eunit/include/eunit.hrl").
2021
-include_lib("rabbit_common/include/rabbit.hrl").
2122
-include_lib("rabbit_common/include/rabbit_framing.hrl").
2223

@@ -44,7 +45,6 @@ groups() ->
4445
decrypt_config,
4546
listing_plugins_from_multiple_directories,
4647
rabbitmqctl_encode,
47-
pg_local,
4848
pmerge,
4949
plmerge,
5050
priority_queue,
@@ -68,6 +68,9 @@ groups() ->
6868
]}
6969
]},
7070
{sequential_tests, [], [
71+
pg_local,
72+
pg_local_with_unexpected_deaths1,
73+
pg_local_with_unexpected_deaths2,
7174
decrypt_start_app,
7275
decrypt_start_app_file,
7376
decrypt_start_app_undefined,
@@ -377,29 +380,79 @@ rabbit_direct_extract_extra_auth_props(_Config) ->
377380
%% -------------------------------------------------------------------
378381

379382
pg_local(_Config) ->
380-
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
383+
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
381384
check_pg_local(ok, [], []),
385+
%% P joins group a, then b, then a again
382386
check_pg_local(pg_local:join(a, P), [P], []),
383387
check_pg_local(pg_local:join(b, P), [P], [P]),
384388
check_pg_local(pg_local:join(a, P), [P, P], [P]),
389+
%% Q joins group a, then b, then b again
385390
check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
386391
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
387392
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
393+
%% P leaves groups a and a
388394
check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
389395
check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
396+
%% leave/2 is idempotent
390397
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
391398
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
399+
%% clean up all processes
392400
[begin X ! done,
393401
Ref = erlang:monitor(process, X),
394402
receive {'DOWN', Ref, process, X, _Info} -> ok end
395403
end || X <- [P, Q]],
404+
%% ensure the groups are empty
405+
check_pg_local(ok, [], []),
406+
passed.
407+
408+
pg_local_with_unexpected_deaths1(_Config) ->
409+
[A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
396410
check_pg_local(ok, [], []),
411+
%% A joins groups a and b
412+
check_pg_local(pg_local:join(a, A), [A], []),
413+
check_pg_local(pg_local:join(b, A), [A], [A]),
414+
%% B joins group b
415+
check_pg_local(pg_local:join(b, B), [A], [A, B]),
416+
417+
[begin erlang:exit(X, sleep_now_in_a_fire),
418+
Ref = erlang:monitor(process, X),
419+
receive {'DOWN', Ref, process, X, _Info} -> ok end
420+
end || X <- [A, B]],
421+
%% ensure the groups are empty
422+
check_pg_local(ok, [], []),
423+
?assertNot(erlang:is_process_alive(A)),
424+
?assertNot(erlang:is_process_alive(B)),
425+
426+
passed.
427+
428+
pg_local_with_unexpected_deaths2(_Config) ->
429+
[A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
430+
check_pg_local(ok, [], []),
431+
%% A joins groups a and b
432+
check_pg_local(pg_local:join(a, A), [A], []),
433+
check_pg_local(pg_local:join(b, A), [A], [A]),
434+
%% B joins group b
435+
check_pg_local(pg_local:join(b, B), [A], [A, B]),
436+
437+
%% something else yanks a record (or all of them) from the pg_local
438+
%% bookkeeping table
439+
ok = pg_local:clear(),
440+
441+
[begin erlang:exit(X, sleep_now_in_a_fire),
442+
Ref = erlang:monitor(process, X),
443+
receive {'DOWN', Ref, process, X, _Info} -> ok end
444+
end || X <- [A, B]],
445+
%% ensure the groups are empty
446+
check_pg_local(ok, [], []),
447+
?assertNot(erlang:is_process_alive(A)),
448+
?assertNot(erlang:is_process_alive(B)),
449+
397450
passed.
398451

399452
check_pg_local(ok, APids, BPids) ->
400453
ok = pg_local:sync(),
401-
[true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
402-
{Key, Pids} <- [{a, APids}, {b, BPids}]].
454+
?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
455+
{Key, Pids} <- [{a, APids}, {b, BPids}]]).
403456

404457
%% -------------------------------------------------------------------
405458
%% priority_queue.

0 commit comments

Comments
 (0)