Skip to content

Commit 2288f54

Browse files
Make pg_local:member_died/2 more resilient
See #1699 for background. [#160530707] (cherry picked from commit dc89457)
1 parent 45f4c24 commit 2288f54

File tree

2 files changed

+104
-31
lines changed

2 files changed

+104
-31
lines changed

src/pg_local.erl

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
-module(pg_local).
3636

3737
-export([join/2, leave/2, get_members/1, in_group/2]).
38-
-export([sync/0]). %% intended for testing only; not part of official API
38+
%% intended for testing only; not part of official API
39+
-export([sync/0, clear/0]).
3940
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
4041
handle_info/2, terminate/2]).
4142

@@ -54,7 +55,7 @@
5455

5556
%%----------------------------------------------------------------------------
5657

57-
%%% As of R13B03 monitors are used instead of links.
58+
-define(TABLE, pg_local_table).
5859

5960
%%%
6061
%%% Exported functions
@@ -92,19 +93,27 @@ sync() ->
9293
_ = ensure_started(),
9394
gen_server:call(?MODULE, sync, infinity).
9495

96+
clear() ->
97+
_ = ensure_started(),
98+
gen_server:call(?MODULE, clear, infinity).
99+
95100
%%%
96101
%%% Callback functions from gen_server
97102
%%%
98103

99104
-record(state, {}).
100105

101106
init([]) ->
102-
pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
107+
?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]),
103108
{ok, #state{}}.
104109

105110
handle_call(sync, _From, S) ->
106111
{reply, ok, S};
107112

113+
handle_call(clear, _From, S) ->
114+
ets:delete_all_objects(?TABLE),
115+
{reply, ok, S};
116+
108117
handle_call(Request, From, S) ->
109118
error_logger:warning_msg("The pg_local server received an unexpected message:\n"
110119
"handle_call(~p, ~p, _)\n",
@@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) ->
120129
handle_cast(_, S) ->
121130
{noreply, S}.
122131

123-
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
124-
member_died(MonitorRef),
132+
handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) ->
133+
member_died(MonitorRef, Pid),
125134
{noreply, S};
126135
handle_info(_, S) ->
127136
{noreply, S}.
128137

129138
terminate(_Reason, _S) ->
130-
true = ets:delete(pg_local_table),
139+
true = ets:delete(?TABLE),
131140
ok.
132141

133142
%%%
@@ -148,46 +157,57 @@ terminate(_Reason, _S) ->
148157
%%% {{pid, Pid, Name}}
149158
%%% Pid is a member of group Name.
150159

151-
member_died(Ref) ->
152-
[{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
160+
member_died(Ref, Pid) ->
161+
case ets:lookup(?TABLE, {ref, Ref}) of
162+
[{{ref, Ref}, Pid}] ->
163+
leave_all_groups(Pid);
164+
%% in case the key has already been removed
165+
%% we can perform the lookup using the DOWN message pid
166+
[] ->
167+
leave_all_groups(Pid);
168+
_ ->
169+
leave_all_groups(Pid)
170+
end,
171+
ok.
172+
173+
leave_all_groups(Pid) ->
153174
Names = member_groups(Pid),
154175
_ = [leave_group(Name, P) ||
155176
Name <- Names,
156-
P <- member_in_group(Pid, Name)],
157-
ok.
177+
P <- member_in_group(Pid, Name)].
158178

159179
join_group(Name, Pid) ->
160180
Ref_Pid = {ref, Pid},
161-
try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
181+
try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1})
162182
catch _:_ ->
163183
Ref = erlang:monitor(process, Pid),
164-
true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
165-
true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
184+
true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}),
185+
true = ets:insert(?TABLE, {{ref, Ref}, Pid})
166186
end,
167187
Member_Name_Pid = {member, Name, Pid},
168-
try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
188+
try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1})
169189
catch _:_ ->
170-
true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
171-
true = ets:insert(pg_local_table, {{pid, Pid, Name}})
190+
true = ets:insert(?TABLE, {Member_Name_Pid, 1}),
191+
true = ets:insert(?TABLE, {{pid, Pid, Name}})
172192
end.
173193

174194
leave_group(Name, Pid) ->
175195
Member_Name_Pid = {member, Name, Pid},
176-
try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
196+
try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of
177197
N ->
178198
if
179199
N =:= 0 ->
180-
true = ets:delete(pg_local_table, {pid, Pid, Name}),
181-
true = ets:delete(pg_local_table, Member_Name_Pid);
200+
true = ets:delete(?TABLE, {pid, Pid, Name}),
201+
true = ets:delete(?TABLE, Member_Name_Pid);
182202
true ->
183203
ok
184204
end,
185205
Ref_Pid = {ref, Pid},
186-
case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
206+
case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of
187207
0 ->
188-
[{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
189-
true = ets:delete(pg_local_table, {ref, Ref}),
190-
true = ets:delete(pg_local_table, Ref_Pid),
208+
[{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid),
209+
true = ets:delete(?TABLE, {ref, Ref}),
210+
true = ets:delete(?TABLE, Ref_Pid),
191211
true = erlang:demonitor(Ref, [flush]),
192212
ok;
193213
_ ->
@@ -199,21 +219,21 @@ leave_group(Name, Pid) ->
199219

200220
group_members(Name) ->
201221
[P ||
202-
[P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
222+
[P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}),
203223
_ <- lists:seq(1, N)].
204224

205225
member_in_group(Pid, Name) ->
206-
[{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
226+
[{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}),
207227
lists:duplicate(N, Pid).
208228

209229
member_present(Name, Pid) ->
210-
case ets:lookup(pg_local_table, {member, Name, Pid}) of
230+
case ets:lookup(?TABLE, {member, Name, Pid}) of
211231
[_] -> true;
212232
[] -> false
213233
end.
214234

215235
member_groups(Pid) ->
216-
[Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
236+
[Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})].
217237

218238
ensure_started() ->
219239
case whereis(?MODULE) of

test/unit_SUITE.erl

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
-module(unit_SUITE).
1818

1919
-include_lib("common_test/include/ct.hrl").
20+
-include_lib("eunit/include/eunit.hrl").
2021
-include_lib("rabbit_common/include/rabbit.hrl").
2122
-include_lib("rabbit_common/include/rabbit_framing.hrl").
2223

@@ -44,7 +45,6 @@ groups() ->
4445
decrypt_config,
4546
listing_plugins_from_multiple_directories,
4647
rabbitmqctl_encode,
47-
pg_local,
4848
pmerge,
4949
plmerge,
5050
priority_queue,
@@ -68,6 +68,9 @@ groups() ->
6868
]}
6969
]},
7070
{sequential_tests, [], [
71+
pg_local,
72+
pg_local_with_unexpected_deaths1,
73+
pg_local_with_unexpected_deaths2,
7174
decrypt_start_app,
7275
decrypt_start_app_file,
7376
decrypt_start_app_undefined,
@@ -377,29 +380,79 @@ rabbit_direct_extract_extra_auth_props(_Config) ->
377380
%% -------------------------------------------------------------------
378381

379382
pg_local(_Config) ->
380-
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
383+
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
381384
check_pg_local(ok, [], []),
385+
%% P joins group a, then b, then a again
382386
check_pg_local(pg_local:join(a, P), [P], []),
383387
check_pg_local(pg_local:join(b, P), [P], [P]),
384388
check_pg_local(pg_local:join(a, P), [P, P], [P]),
389+
%% Q joins group a, then b, then b again
385390
check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
386391
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
387392
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
393+
%% P leaves groups a and a
388394
check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
389395
check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
396+
%% leave/2 is idempotent
390397
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
391398
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
399+
%% clean up all processes
392400
[begin X ! done,
393401
Ref = erlang:monitor(process, X),
394402
receive {'DOWN', Ref, process, X, _Info} -> ok end
395403
end || X <- [P, Q]],
404+
%% ensure the groups are empty
405+
check_pg_local(ok, [], []),
406+
passed.
407+
408+
pg_local_with_unexpected_deaths1(_Config) ->
409+
[A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
396410
check_pg_local(ok, [], []),
411+
%% A joins groups a and b
412+
check_pg_local(pg_local:join(a, A), [A], []),
413+
check_pg_local(pg_local:join(b, A), [A], [A]),
414+
%% B joins group b
415+
check_pg_local(pg_local:join(b, B), [A], [A, B]),
416+
417+
[begin erlang:exit(X, sleep_now_in_a_fire),
418+
Ref = erlang:monitor(process, X),
419+
receive {'DOWN', Ref, process, X, _Info} -> ok end
420+
end || X <- [A, B]],
421+
%% ensure the groups are empty
422+
check_pg_local(ok, [], []),
423+
?assertNot(erlang:is_process_alive(A)),
424+
?assertNot(erlang:is_process_alive(B)),
425+
426+
passed.
427+
428+
pg_local_with_unexpected_deaths2(_Config) ->
429+
[A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
430+
check_pg_local(ok, [], []),
431+
%% A joins groups a and b
432+
check_pg_local(pg_local:join(a, A), [A], []),
433+
check_pg_local(pg_local:join(b, A), [A], [A]),
434+
%% B joins group b
435+
check_pg_local(pg_local:join(b, B), [A], [A, B]),
436+
437+
%% something else yanks a record (or all of them) from the pg_local
438+
%% bookkeeping table
439+
ok = pg_local:clear(),
440+
441+
[begin erlang:exit(X, sleep_now_in_a_fire),
442+
Ref = erlang:monitor(process, X),
443+
receive {'DOWN', Ref, process, X, _Info} -> ok end
444+
end || X <- [A, B]],
445+
%% ensure the groups are empty
446+
check_pg_local(ok, [], []),
447+
?assertNot(erlang:is_process_alive(A)),
448+
?assertNot(erlang:is_process_alive(B)),
449+
397450
passed.
398451

399452
check_pg_local(ok, APids, BPids) ->
400453
ok = pg_local:sync(),
401-
[true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
402-
{Key, Pids} <- [{a, APids}, {b, BPids}]].
454+
?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
455+
{Key, Pids} <- [{a, APids}, {b, BPids}]]).
403456

404457
%% -------------------------------------------------------------------
405458
%% priority_queue.

0 commit comments

Comments
 (0)