Skip to content

Commit d439b8b

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Re-evaluate stream SAC group after connection down event
The same connection can contain several consumers belonging to a SAC group (group key = vhost + stream + consumer name). The whole new group must be re-evaluated to select a new active consumer after the consumers of the down connection are removed from it. The previous behavior would not re-evaluate the new group and could select a consumer from the down connection, letting the group with only inactive consumers, as the selected active consumer would never receive the activation message from the stream SAC coordinator. This commit fixes this problem by removing the consumers of the down down connection from the affected groups and then performing the appropriate operations for the groups to keep on consuming (e.g. notifying an active consumer that it needs to step down). References #13372 (cherry picked from commit 602b6ac)
1 parent 1145f06 commit d439b8b

File tree

3 files changed

+222
-75
lines changed

3 files changed

+222
-75
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
229229
of
230230
{value, Consumer} ->
231231
G1 = remove_from_group(Consumer, Group0),
232-
handle_consumer_removal(G1, Consumer, Stream, ConsumerName);
232+
handle_consumer_removal(G1, Stream, ConsumerName, Consumer#consumer.active);
233233
false ->
234234
{Group0, []}
235235
end,
@@ -414,50 +414,44 @@ handle_connection_down(Pid,
414414
{State0, []};
415415
{Groups, PidsGroups1} ->
416416
State1 = State0#?MODULE{pids_groups = PidsGroups1},
417-
%% iterate other the groups that this PID affects
418-
maps:fold(fun({VirtualHost, Stream, ConsumerName}, _,
419-
{#?MODULE{groups = ConsumerGroups} = S0, Eff0}) ->
420-
case lookup_group(VirtualHost,
421-
Stream,
422-
ConsumerName,
423-
ConsumerGroups)
424-
of
425-
undefined -> {S0, Eff0};
426-
#group{consumers = Consumers} ->
427-
%% iterate over the consumers of the group
428-
%% and unregister the ones from this PID.
429-
%% It may not be optimal, computing the new active consumer
430-
%% from the purged group and notifying the remaining consumers
431-
%% appropriately should avoid unwanted notifications and even rebalancing.
432-
lists:foldl(fun (#consumer{pid = P,
433-
subscription_id =
434-
SubId},
435-
{StateSub0, EffSub0})
436-
when P == Pid ->
437-
{StateSub1, ok, E} =
438-
?MODULE:apply(#command_unregister_consumer{vhost
439-
=
440-
VirtualHost,
441-
stream
442-
=
443-
Stream,
444-
consumer_name
445-
=
446-
ConsumerName,
447-
connection_pid
448-
=
449-
Pid,
450-
subscription_id
451-
=
452-
SubId},
453-
StateSub0),
454-
{StateSub1, EffSub0 ++ E};
455-
(_Consumer, Acc) -> Acc
456-
end,
457-
{S0, Eff0}, Consumers)
458-
end
459-
end,
460-
{State1, []}, Groups)
417+
maps:fold(fun(G, _, Acc) ->
418+
handle_group_after_connection_down(Pid, Acc, G)
419+
end, {State1, []}, Groups)
420+
end.
421+
422+
handle_group_after_connection_down(Pid,
423+
{#?MODULE{groups = Groups0} = S0, Eff0},
424+
{VirtualHost, Stream, ConsumerName}) ->
425+
case lookup_group(VirtualHost,
426+
Stream,
427+
ConsumerName,
428+
Groups0) of
429+
undefined ->
430+
{S0, Eff0};
431+
#group{consumers = Consumers0} = G0 ->
432+
%% remove the connection consumers from the group state
433+
%% keep flags to know what happened
434+
{Consumers1, ActiveRemoved, AnyRemoved} =
435+
lists:foldl(
436+
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid ->
437+
{L, S or ActiveFlag, true};
438+
(C, {L, ActiveFlag, AnyFlag}) ->
439+
{L ++ [C], ActiveFlag, AnyFlag}
440+
end, {[], false, false}, Consumers0),
441+
442+
case AnyRemoved of
443+
true ->
444+
G1 = G0#group{consumers = Consumers1},
445+
{G2, Effects} = handle_consumer_removal(G1, Stream, ConsumerName, ActiveRemoved),
446+
Groups1 = update_groups(VirtualHost,
447+
Stream,
448+
ConsumerName,
449+
G2,
450+
Groups0),
451+
{S0#?MODULE{groups = Groups1}, Effects ++ Eff0};
452+
false ->
453+
{S0, Eff0}
454+
end
461455
end.
462456

463457
do_register_consumer(VirtualHost,
@@ -576,9 +570,9 @@ do_register_consumer(VirtualHost,
576570
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
577571
{G, []};
578572
handle_consumer_removal(#group{partition_index = -1} = Group0,
579-
Consumer, Stream, ConsumerName) ->
580-
case Consumer of
581-
#consumer{active = true} ->
573+
Stream, ConsumerName, ActiveRemoved) ->
574+
case ActiveRemoved of
575+
true ->
582576
%% this is the active consumer we remove, computing the new one
583577
Group1 = compute_active_consumer(Group0),
584578
case lookup_active_consumer(Group1) of
@@ -589,11 +583,11 @@ handle_consumer_removal(#group{partition_index = -1} = Group0,
589583
%% no active consumer found in the group, nothing to do
590584
{Group1, []}
591585
end;
592-
#consumer{active = false} ->
586+
false ->
593587
%% not the active consumer, nothing to do.
594588
{Group0, []}
595589
end;
596-
handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
590+
handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
597591
case lookup_active_consumer(Group0) of
598592
{value,
599593
#consumer{pid = ActPid, subscription_id = ActSubId} =
@@ -612,7 +606,7 @@ handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
612606
Stream, ConsumerName, false, true)]}
613607
end;
614608
false ->
615-
case Consumer#consumer.active of
609+
case ActiveRemoved of
616610
true ->
617611
%% the active one is going away, picking a new one
618612
#consumer{pid = P, subscription_id = SID} =

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 175 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -312,29 +312,27 @@ ensure_monitors_test(_) ->
312312

313313
ok.
314314

315-
handle_connection_down_test(_) ->
315+
handle_connection_down_sac_should_get_activated_test(_) ->
316316
Stream = <<"stream">>,
317317
ConsumerName = <<"app">>,
318318
GroupId = {<<"/">>, Stream, ConsumerName},
319319
Pid0 = self(),
320320
Pid1 = spawn(fun() -> ok end),
321-
Group =
322-
cgroup([consumer(Pid0, 0, true), consumer(Pid1, 1, false),
323-
consumer(Pid0, 2, false)]),
324-
State0 =
325-
state(#{GroupId => Group},
326-
#{Pid0 => maps:from_list([{GroupId, true}]),
327-
Pid1 => maps:from_list([{GroupId, true}])}),
321+
Group = cgroup([consumer(Pid0, 0, true),
322+
consumer(Pid1, 1, false),
323+
consumer(Pid0, 2, false)]),
324+
State0 = state(#{GroupId => Group},
325+
#{Pid0 => maps:from_list([{GroupId, true}]),
326+
Pid1 => maps:from_list([{GroupId, true}])}),
328327

329328
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
330329
Effects1} =
331-
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
330+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
332331
assertSize(1, PidsGroups1),
333332
assertSize(1, maps:get(Pid1, PidsGroups1)),
334333
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
335-
?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
336-
Groups1),
337-
{#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
334+
assertHasGroup(GroupId, cgroup([consumer(Pid1, 1, true)]), Groups1),
335+
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
338336
Effects2} =
339337
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State1),
340338
assertEmpty(PidsGroups2),
@@ -343,6 +341,168 @@ handle_connection_down_test(_) ->
343341

344342
ok.
345343

344+
handle_connection_down_sac_active_does_not_change_test(_) ->
345+
Stream = <<"stream">>,
346+
ConsumerName = <<"app">>,
347+
GroupId = {<<"/">>, Stream, ConsumerName},
348+
Pid0 = self(),
349+
Pid1 = spawn(fun() -> ok end),
350+
Group = cgroup([consumer(Pid1, 0, true),
351+
consumer(Pid0, 1, false),
352+
consumer(Pid0, 2, false)]),
353+
State = state(#{GroupId => Group},
354+
#{Pid0 => maps:from_list([{GroupId, true}]),
355+
Pid1 => maps:from_list([{GroupId, true}])}),
356+
357+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
358+
Effects} =
359+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
360+
assertSize(1, PidsGroups),
361+
assertSize(1, maps:get(Pid1, PidsGroups)),
362+
assertEmpty(Effects),
363+
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true)]), Groups),
364+
ok.
365+
366+
handle_connection_down_sac_no_more_consumers_test(_) ->
367+
Stream = <<"stream">>,
368+
ConsumerName = <<"app">>,
369+
GroupId = {<<"/">>, Stream, ConsumerName},
370+
Pid0 = self(),
371+
Group = cgroup([consumer(Pid0, 0, true),
372+
consumer(Pid0, 1, false)]),
373+
State = state(#{GroupId => Group},
374+
#{Pid0 => maps:from_list([{GroupId, true}])}),
375+
376+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
377+
Effects} =
378+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
379+
assertEmpty(PidsGroups),
380+
assertEmpty(Groups),
381+
assertEmpty(Effects),
382+
ok.
383+
384+
handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
385+
Stream = <<"stream">>,
386+
ConsumerName = <<"app">>,
387+
GroupId = {<<"/">>, Stream, ConsumerName},
388+
Pid0 = self(),
389+
Pid1 = spawn(fun() -> ok end),
390+
Group = cgroup([consumer(Pid1, 0, true),
391+
consumer(Pid1, 1, false)]),
392+
State = state(#{GroupId => Group},
393+
#{Pid0 => maps:from_list([{GroupId, true}]), %% should not be there
394+
Pid1 => maps:from_list([{GroupId, true}])}),
395+
396+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
397+
Effects} =
398+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
399+
400+
assertSize(1, PidsGroups),
401+
assertSize(1, maps:get(Pid1, PidsGroups)),
402+
assertEmpty(Effects),
403+
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]),
404+
Groups),
405+
ok.
406+
407+
handle_connection_down_super_stream_active_stays_test(_) ->
408+
Stream = <<"stream">>,
409+
ConsumerName = <<"app">>,
410+
GroupId = {<<"/">>, Stream, ConsumerName},
411+
Pid0 = self(),
412+
Pid1 = spawn(fun() -> ok end),
413+
Group = cgroup(1, [consumer(Pid0, 0, false),
414+
consumer(Pid0, 1, true),
415+
consumer(Pid1, 2, false),
416+
consumer(Pid1, 3, false)]),
417+
State = state(#{GroupId => Group},
418+
#{Pid0 => maps:from_list([{GroupId, true}]),
419+
Pid1 => maps:from_list([{GroupId, true}])}),
420+
421+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
422+
Effects} =
423+
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State),
424+
assertSize(1, PidsGroups),
425+
assertSize(1, maps:get(Pid0, PidsGroups)),
426+
assertEmpty(Effects),
427+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true)]),
428+
Groups),
429+
ok.
430+
431+
handle_connection_down_super_stream_active_changes_test(_) ->
432+
Stream = <<"stream">>,
433+
ConsumerName = <<"app">>,
434+
GroupId = {<<"/">>, Stream, ConsumerName},
435+
Pid0 = self(),
436+
Pid1 = spawn(fun() -> ok end),
437+
Group = cgroup(1, [consumer(Pid0, 0, false),
438+
consumer(Pid1, 1, true),
439+
consumer(Pid0, 2, false),
440+
consumer(Pid1, 3, false)]),
441+
State = state(#{GroupId => Group},
442+
#{Pid0 => maps:from_list([{GroupId, true}]),
443+
Pid1 => maps:from_list([{GroupId, true}])}),
444+
445+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
446+
Effects} =
447+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
448+
assertSize(1, PidsGroups),
449+
assertSize(1, maps:get(Pid1, PidsGroups)),
450+
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects),
451+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 1, false), consumer(Pid1, 3, false)]),
452+
Groups),
453+
ok.
454+
455+
handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
456+
Stream = <<"stream">>,
457+
ConsumerName = <<"app">>,
458+
GroupId = {<<"/">>, Stream, ConsumerName},
459+
Pid0 = self(),
460+
Pid1 = spawn(fun() -> ok end),
461+
Group = cgroup(1, [consumer(Pid0, 0, false),
462+
consumer(Pid0, 1, true),
463+
consumer(Pid1, 2, false),
464+
consumer(Pid1, 3, false)]),
465+
State = state(#{GroupId => Group},
466+
#{Pid0 => maps:from_list([{GroupId, true}]),
467+
Pid1 => maps:from_list([{GroupId, true}])}),
468+
469+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
470+
Effects} =
471+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
472+
assertSize(1, PidsGroups),
473+
assertSize(1, maps:get(Pid1, PidsGroups)),
474+
assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
475+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, true)]),
476+
Groups),
477+
ok.
478+
479+
handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
480+
Stream = <<"stream">>,
481+
ConsumerName = <<"app">>,
482+
GroupId = {<<"/">>, Stream, ConsumerName},
483+
Pid0 = self(),
484+
Pid1 = spawn(fun() -> ok end),
485+
%% this is a weird case that should not happen in the wild,
486+
%% we test the logic in the code nevertheless.
487+
%% No active consumer in the group
488+
Group = cgroup(1, [consumer(Pid0, 0, false),
489+
consumer(Pid0, 1, false),
490+
consumer(Pid1, 2, false),
491+
consumer(Pid1, 3, false)]),
492+
State = state(#{GroupId => Group},
493+
#{Pid0 => maps:from_list([{GroupId, true}]),
494+
Pid1 => maps:from_list([{GroupId, true}])}),
495+
496+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
497+
Effects} =
498+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
499+
assertSize(1, PidsGroups),
500+
assertSize(1, maps:get(Pid1, PidsGroups)),
501+
assertEmpty(Effects),
502+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, false)]),
503+
Groups),
504+
ok.
505+
346506
assertSize(Expected, []) ->
347507
?assertEqual(Expected, 0);
348508
assertSize(Expected, Map) when is_map(Map) ->
@@ -353,6 +513,9 @@ assertSize(Expected, List) when is_list(List) ->
353513
assertEmpty(Data) ->
354514
assertSize(0, Data).
355515

516+
assertHasGroup(GroupId, Group, Groups) ->
517+
?assertEqual(#{GroupId => Group}, Groups).
518+
356519
consumer(Pid, SubId, Active) ->
357520
#consumer{pid = Pid,
358521
subscription_id = SubId,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -598,26 +598,16 @@ augment_infos_with_user_provided_connection_name(Infos,
598598
end.
599599

600600
close(Transport,
601-
#stream_connection{socket = S, virtual_host = VirtualHost,
602-
outstanding_requests = Requests},
601+
#stream_connection{socket = S},
603602
#stream_connection_state{consumers = Consumers}) ->
604603
[begin
605-
%% we discard the result (updated requests) because they are no longer used
606-
_ = maybe_unregister_consumer(VirtualHost, Consumer,
607-
single_active_consumer(Properties),
608-
Requests),
609604
case Log of
610605
undefined ->
611606
ok; %% segment may not be defined on subscription (single active consumer)
612607
L ->
613608
osiris_log:close(L)
614609
end
615-
end
616-
|| #consumer{log = Log,
617-
configuration =
618-
#consumer_configuration{properties = Properties}} =
619-
Consumer
620-
<- maps:values(Consumers)],
610+
end || #consumer{log = Log} <- maps:values(Consumers)],
621611
Transport:shutdown(S, write),
622612
Transport:close(S).
623613

0 commit comments

Comments
 (0)