@@ -277,13 +277,17 @@ handle_call({init, Overall}, _From,
277
277
tx_fun = TxFun ,
278
278
initial_childspecs = ChildSpecs }) ->
279
279
process_flag (trap_exit , true ),
280
+ LockId = mirrored_supervisor_locks :lock (Group ),
281
+ maybe_log_lock_acquisition_failure (LockId , Group ),
280
282
ok = pg :join (Group , Overall ),
281
- rabbit_log :debug (" Mirrored supervisor: initializing, joined group ~p " , [Group ]),
283
+ rabbit_log :debug (" Mirrored supervisor: initializing, overall supervisor ~p joined group ~p " , [Overall , Group ]),
282
284
Rest = pg :get_members (Group ) -- [Overall ],
283
285
Nodes = [node (M ) || M <- Rest ],
284
286
rabbit_log :debug (" Mirrored supervisor: known group ~p members: ~p on nodes ~p " , [Group , Rest , Nodes ]),
285
287
case Rest of
286
- [] -> TxFun (fun () -> delete_all (Group ) end );
288
+ [] ->
289
+ rabbit_log :debug (" Mirrored supervisor: no known peer members in group ~p , will delete all child records for it" , [Group ]),
290
+ TxFun (fun () -> delete_all (Group ) end );
287
291
_ -> ok
288
292
end ,
289
293
[begin
@@ -293,8 +297,9 @@ handle_call({init, Overall}, _From,
293
297
Delegate = delegate (Overall ),
294
298
erlang :monitor (process , Delegate ),
295
299
State1 = State # state {overall = Overall , delegate = Delegate },
296
- case errors ([maybe_start (Group , TxFun , Overall , Delegate , S )
297
- || S <- ChildSpecs ]) of
300
+ Results = [maybe_start (Group , TxFun , Overall , Delegate , S ) || S <- ChildSpecs ],
301
+ mirrored_supervisor_locks :unlock (LockId ),
302
+ case errors (Results ) of
298
303
[] -> {reply , ok , State1 };
299
304
Errors -> {stop , {shutdown , Errors }, State1 }
300
305
end ;
@@ -304,11 +309,25 @@ handle_call({start_child, ChildSpec}, _From,
304
309
delegate = Delegate ,
305
310
group = Group ,
306
311
tx_fun = TxFun }) ->
307
- {reply , case maybe_start (Group , TxFun , Overall , Delegate , ChildSpec ) of
308
- already_in_mnesia -> {error , already_present };
309
- {already_in_mnesia , Pid } -> {error , {already_started , Pid }};
310
- Else -> Else
311
- end , State };
312
+ LockId = mirrored_supervisor_locks :lock (Group ),
313
+ maybe_log_lock_acquisition_failure (LockId , Group ),
314
+ rabbit_log :debug (" Mirrored supervisor: asked to consider starting a child, group: ~p " , [Group ]),
315
+ Result = case maybe_start (Group , TxFun , Overall , Delegate , ChildSpec ) of
316
+ already_in_mnesia ->
317
+ rabbit_log :debug (" Mirrored supervisor: maybe_start for group ~p ,"
318
+ " overall ~p returned 'record already present'" , [Group , Overall ]),
319
+ {error , already_present };
320
+ {already_in_mnesia , Pid } ->
321
+ rabbit_log :debug (" Mirrored supervisor: maybe_start for group ~p ,"
322
+ " overall ~p returned 'already running: ~p '" , [Group , Overall , Pid ]),
323
+ {error , {already_started , Pid }};
324
+ Else ->
325
+ rabbit_log :debug (" Mirrored supervisor: maybe_start for group ~p ,"
326
+ " overall ~p returned ~p " , [Group , Overall , Else ]),
327
+ Else
328
+ end ,
329
+ mirrored_supervisor_locks :unlock (LockId ),
330
+ {reply , Result , State };
312
331
313
332
handle_call ({delete_child , Id }, _From , State = # state {delegate = Delegate ,
314
333
group = Group ,
@@ -384,28 +403,50 @@ tell_all_peers_to_die(Group, Reason) ->
384
403
[cast (P , {die , Reason }) || P <- pg :get_members (Group ) -- [self ()]].
385
404
386
405
maybe_start (Group , TxFun , Overall , Delegate , ChildSpec ) ->
406
+ rabbit_log :debug (" Mirrored supervisor: asked to consider starting, group: ~p " , [Group ]),
387
407
try TxFun (fun () -> check_start (Group , Overall , Delegate , ChildSpec ) end ) of
388
- start -> start (Delegate , ChildSpec );
389
- undefined -> already_in_mnesia ;
390
- Pid -> {already_in_mnesia , Pid }
408
+ start ->
409
+ rabbit_log :debug (" Mirrored supervisor: check_start for group ~p ,"
410
+ " overall ~p returned 'do start'" , [Group , Overall ]),
411
+ start (Delegate , ChildSpec );
412
+ undefined ->
413
+ rabbit_log :debug (" Mirrored supervisor: check_start for group ~p ,"
414
+ " overall ~p returned 'undefined'" , [Group , Overall ]),
415
+ already_in_mnesia ;
416
+ Pid ->
417
+ rabbit_log :debug (" Mirrored supervisor: check_start for group ~p ,"
418
+ " overall ~p returned 'already running (~p )'" , [Group , Overall , Pid ]),
419
+ {already_in_mnesia , Pid }
391
420
catch
392
421
% % If we are torn down while in the transaction...
393
422
{error , E } -> {error , E }
394
423
end .
395
424
396
425
check_start (Group , Overall , Delegate , ChildSpec ) ->
397
- case mnesia :wread ({? TABLE , {Group , id (ChildSpec )}}) of
426
+ rabbit_log :debug (" Mirrored supervisor: check_start for group ~p , id: ~p , overall: ~p " ,
427
+ [Group , id (ChildSpec ), Overall ]),
428
+ ReadResult = mnesia :wread ({? TABLE , {Group , id (ChildSpec )}}),
429
+ rabbit_log :debug (" Mirrored supervisor: check_start table ~s read for key ~p returned ~p " ,
430
+ [? TABLE , {Group , id (ChildSpec )}, ReadResult ]),
431
+ case ReadResult of
398
432
[] -> _ = write (Group , Overall , ChildSpec ),
399
433
start ;
400
434
[S ] -> # mirrored_sup_childspec {key = {Group , Id },
401
435
mirroring_pid = Pid } = S ,
402
436
case Overall of
403
- Pid -> child (Delegate , Id );
404
- _ -> case supervisor (Pid ) of
405
- dead -> _ = write (Group , Overall , ChildSpec ),
406
- start ;
407
- Delegate0 -> child (Delegate0 , Id )
408
- end
437
+ Pid ->
438
+ rabbit_log :debug (" Mirrored supervisor: overall matched mirrored pid ~p " , [Pid ]),
439
+ child (Delegate , Id );
440
+ _ ->
441
+ rabbit_log :debug (" Mirrored supervisor: overall ~p did not match mirrored pid ~p " , [Overall , Pid ]),
442
+ rabbit_log :debug (" Mirrored supervisor: supervisor(~p ) returned ~p " , [Pid , supervisor (Pid )]),
443
+ case supervisor (Pid ) of
444
+ dead ->
445
+ _ = write (Group , Overall , ChildSpec ),
446
+ start ;
447
+ Delegate0 ->
448
+ child (Delegate0 , Id )
449
+ end
409
450
end
410
451
end .
411
452
@@ -423,7 +464,6 @@ delete(Group, Id) ->
423
464
ok = mnesia :delete ({? TABLE , {Group , Id }}).
424
465
425
466
start (Delegate , ChildSpec ) ->
426
- rabbit_log :debug (" Mirrored supervisor: asked to start with delegate: ~p , child spec: ~p " , [Delegate , ChildSpec ]),
427
467
apply (? SUPERVISOR , start_child , [Delegate , ChildSpec ]).
428
468
429
469
stop (Group , TxFun , Delegate , Id ) ->
@@ -511,3 +551,8 @@ restore_child_order(ChildSpecs, ChildOrder) ->
511
551
proplists :get_value (id (A ), ChildOrder )
512
552
< proplists :get_value (id (B ), ChildOrder )
513
553
end , ChildSpecs ).
554
+
555
+ maybe_log_lock_acquisition_failure (undefined = _LockId , Group ) ->
556
+ rabbit_log :warning (" Mirrored supervisor: could not acquire lock for group ~s " , [Group ]);
557
+ maybe_log_lock_acquisition_failure (_ , _ ) ->
558
+ ok .
0 commit comments