Skip to content

Commit 3b8404b

Browse files
Merge pull request #9577 from rabbitmq/mergify/bp/v3.11.x/pr-9576
Monitor members to the same stream in stream connection (backport #9572) (backport #9576)
2 parents 3938176 + 1470571 commit 3b8404b

File tree

5 files changed

+147
-70
lines changed

5 files changed

+147
-70
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 59 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
heartbeat :: undefined | integer(),
8585
heartbeater :: any(),
8686
client_properties = #{} :: #{binary() => binary()},
87-
monitors = #{} :: #{reference() => stream()},
87+
monitors = #{} :: #{reference() => {pid(), stream()}},
8888
stats_timer :: undefined | reference(),
8989
resource_alarm :: boolean(),
9090
send_file_oct ::
@@ -918,10 +918,10 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
918918
StatemData) ->
919919
{Connection1, State1} =
920920
case Monitors of
921-
#{MonitorRef := Stream} ->
921+
#{MonitorRef := {MemberPid, Stream}} ->
922922
Monitors1 = maps:remove(MonitorRef, Monitors),
923923
C = Connection#stream_connection{monitors = Monitors1},
924-
case clean_state_after_stream_deletion_or_failure(Stream, C,
924+
case clean_state_after_stream_deletion_or_failure(MemberPid, Stream, C,
925925
State)
926926
of
927927
{cleaned, NewConnection, NewState} ->
@@ -1859,7 +1859,7 @@ handle_frame_post_auth(Transport,
18591859
{request, CorrelationId,
18601860
{delete_publisher, PublisherId}}) ->
18611861
case Publishers of
1862-
#{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
1862+
#{PublisherId := #publisher{stream = Stream, reference = Ref, leader = LeaderPid}} ->
18631863
Connection1 =
18641864
Connection0#stream_connection{publishers =
18651865
maps:remove(PublisherId,
@@ -1868,7 +1868,7 @@ handle_frame_post_auth(Transport,
18681868
maps:remove({Stream, Ref},
18691869
PubToIds)},
18701870
Connection2 =
1871-
maybe_clean_connection_from_stream(Stream, Connection1),
1871+
maybe_clean_connection_from_stream(LeaderPid, Stream, Connection1),
18721872
response(Transport,
18731873
Connection1,
18741874
delete_publisher,
@@ -2371,7 +2371,7 @@ handle_frame_post_auth(Transport,
23712371
CorrelationId),
23722372
{Connection1, State1} =
23732373
case
2374-
clean_state_after_stream_deletion_or_failure(Stream,
2374+
clean_state_after_stream_deletion_or_failure(undefined, Stream,
23752375
Connection,
23762376
State)
23772377
of
@@ -3116,7 +3116,7 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
31163116
kind = queue,
31173117
virtual_host = VHost}.
31183118

3119-
clean_state_after_stream_deletion_or_failure(Stream,
3119+
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
31203120
#stream_connection{virtual_host =
31213121
VirtualHost,
31223122
stream_subscriptions
@@ -3141,16 +3141,30 @@ clean_state_after_stream_deletion_or_failure(Stream,
31413141
#{Stream := SubscriptionIds} = StreamSubscriptions,
31423142
Requests1 = lists:foldl(
31433143
fun(SubId, Rqsts0) ->
3144-
rabbit_stream_metrics:consumer_cancelled(self(),
3145-
stream_r(Stream,
3146-
C0),
3147-
SubId),
31483144
#{SubId := Consumer} = Consumers,
3149-
Rqsts1 = maybe_unregister_consumer(
3150-
VirtualHost, Consumer,
3151-
single_active_consumer(Consumer),
3152-
Rqsts0),
3153-
Rqsts1
3145+
case {MemberPid, Consumer} of
3146+
{undefined, _C} ->
3147+
rabbit_stream_metrics:consumer_cancelled(self(),
3148+
stream_r(Stream,
3149+
C0),
3150+
SubId),
3151+
maybe_unregister_consumer(
3152+
VirtualHost, Consumer,
3153+
single_active_consumer(Consumer),
3154+
Rqsts0);
3155+
{MemberPid, #consumer{configuration =
3156+
#consumer_configuration{member_pid = MemberPid}}} ->
3157+
rabbit_stream_metrics:consumer_cancelled(self(),
3158+
stream_r(Stream,
3159+
C0),
3160+
SubId),
3161+
maybe_unregister_consumer(
3162+
VirtualHost, Consumer,
3163+
single_active_consumer(Consumer),
3164+
Rqsts0);
3165+
_ ->
3166+
Rqsts0
3167+
end
31543168
end, Requests0, SubscriptionIds),
31553169
{true,
31563170
C0#stream_connection{stream_subscriptions =
@@ -3169,17 +3183,25 @@ clean_state_after_stream_deletion_or_failure(Stream,
31693183
{PurgedPubs, PurgedPubToIds} =
31703184
maps:fold(fun(PubId,
31713185
#publisher{stream = S, reference = Ref},
3172-
{Pubs, PubToIds}) ->
3173-
case S of
3174-
Stream ->
3186+
{Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined ->
3187+
rabbit_stream_metrics:publisher_deleted(self(),
3188+
stream_r(Stream,
3189+
C1),
3190+
PubId),
3191+
{maps:remove(PubId, Pubs),
3192+
maps:remove({Stream, Ref}, PubToIds)};
3193+
(PubId,
3194+
#publisher{stream = S, reference = Ref, leader = MPid},
3195+
{Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid ->
31753196
rabbit_stream_metrics:publisher_deleted(self(),
3176-
stream_r(S,
3197+
stream_r(Stream,
31773198
C1),
31783199
PubId),
31793200
{maps:remove(PubId, Pubs),
31803201
maps:remove({Stream, Ref}, PubToIds)};
3181-
_ -> {Pubs, PubToIds}
3182-
end
3202+
3203+
(_PubId, _Publisher, {Pubs, PubToIds}) ->
3204+
{Pubs, PubToIds}
31833205
end,
31843206
{Publishers, PublisherToIds}, Publishers),
31853207
{true,
@@ -3201,7 +3223,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
32013223
orelse LeadersCleaned
32023224
of
32033225
true ->
3204-
C3 = demonitor_stream(Stream, C2),
3226+
C3 = demonitor_stream(MemberPid, Stream, C2),
32053227
{cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2};
32063228
false ->
32073229
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
@@ -3240,7 +3262,7 @@ remove_subscription(SubscriptionId,
32403262
#stream_connection_state{consumers = Consumers} = State) ->
32413263
#{SubscriptionId := Consumer} = Consumers,
32423264
#consumer{log = Log,
3243-
configuration = #consumer_configuration{stream = Stream}} =
3265+
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
32443266
Consumer,
32453267
rabbit_log:debug("Deleting subscription ~tp (stream ~tp)",
32463268
[SubscriptionId, Stream]),
@@ -3260,7 +3282,7 @@ remove_subscription(SubscriptionId,
32603282
Connection#stream_connection{stream_subscriptions =
32613283
StreamSubscriptions1},
32623284
Consumers1 = maps:remove(SubscriptionId, Consumers),
3263-
Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
3285+
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
32643286
rabbit_stream_metrics:consumer_cancelled(self(),
32653287
stream_r(Stream, Connection2),
32663288
SubscriptionId),
@@ -3273,7 +3295,7 @@ remove_subscription(SubscriptionId,
32733295
{Connection2#stream_connection{outstanding_requests = Requests1},
32743296
State#stream_connection_state{consumers = Consumers1}}.
32753297

3276-
maybe_clean_connection_from_stream(Stream,
3298+
maybe_clean_connection_from_stream(MemberPid, Stream,
32773299
#stream_connection{stream_leaders =
32783300
Leaders} =
32793301
Connection0) ->
@@ -3282,7 +3304,7 @@ maybe_clean_connection_from_stream(Stream,
32823304
stream_has_subscriptions(Stream, Connection0)}
32833305
of
32843306
{false, false} ->
3285-
demonitor_stream(Stream, Connection0);
3307+
demonitor_stream(MemberPid, Stream, Connection0);
32863308
_ ->
32873309
Connection0
32883310
end,
@@ -3291,26 +3313,27 @@ maybe_clean_connection_from_stream(Stream,
32913313

32923314
maybe_monitor_stream(Pid, Stream,
32933315
#stream_connection{monitors = Monitors} = Connection) ->
3294-
case lists:member(Stream, maps:values(Monitors)) of
3316+
case lists:member({Pid, Stream}, maps:values(Monitors)) of
32953317
true ->
32963318
Connection;
32973319
false ->
32983320
MonitorRef = monitor(process, Pid),
32993321
Connection#stream_connection{monitors =
3300-
maps:put(MonitorRef, Stream,
3322+
maps:put(MonitorRef, {Pid, Stream},
33013323
Monitors)}
33023324
end.
33033325

3304-
demonitor_stream(Stream,
3326+
demonitor_stream(MemberPid, Stream,
33053327
#stream_connection{monitors = Monitors0} = Connection) ->
33063328
Monitors =
3307-
maps:fold(fun(MonitorRef, Strm, Acc) ->
3308-
case Strm of
3309-
Stream ->
3310-
demonitor(MonitorRef, [flush]),
3329+
maps:fold(fun(MonitorRef, {MPid, Strm}, Acc) when MPid =:= MemberPid andalso Strm =:= Stream ->
3330+
demonitor(MonitorRef, [flush]),
3331+
Acc;
3332+
(MonitorRef, {_MPid, Strm}, Acc) when MemberPid =:= undefined andalso Strm =:= Stream ->
3333+
demonitor(MonitorRef, [flush]),
33113334
Acc;
3312-
_ -> maps:put(MonitorRef, Strm, Acc)
3313-
end
3335+
(MonitorRef, {MPid, Strm}, Acc) ->
3336+
maps:put(MonitorRef, {MPid, Strm}, Acc)
33143337
end,
33153338
#{}, Monitors0),
33163339
Connection#stream_connection{monitors = Monitors}.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,24 +104,23 @@ init_per_group(Group, Config)
104104
++ ExtraSetupSteps
105105
++ rabbit_ct_broker_helpers:setup_steps());
106106
init_per_group(cluster = Group, Config) ->
107-
Config1 =
108-
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]),
109-
Config2 =
110-
rabbit_ct_helpers:set_config(Config1,
111-
[{rmq_nodes_count, 3},
112-
{rmq_nodename_suffix, Group},
113-
{tcp_ports_base}]),
114-
Config3 =
115-
rabbit_ct_helpers:set_config(Config2,
116-
{rabbitmq_ct_tls_verify, verify_none}),
117-
rabbit_ct_helpers:run_setup_steps(Config3,
118-
[fun(StepConfig) ->
119-
rabbit_ct_helpers:merge_app_env(StepConfig,
120-
{aten,
121-
[{poll_interval,
122-
1000}]})
123-
end]
124-
++ rabbit_ct_broker_helpers:setup_steps());
107+
Config1 = rabbit_ct_helpers:set_config(
108+
Config, [{rmq_nodes_clustered, true},
109+
{rmq_nodes_count, 3},
110+
{rmq_nodename_suffix, Group},
111+
{tcp_ports_base},
112+
{rabbitmq_ct_tls_verify, verify_none},
113+
{find_crashes, false} %% we kill stream members in some tests
114+
]),
115+
rabbit_ct_helpers:run_setup_steps(
116+
Config1,
117+
[fun(StepConfig) ->
118+
rabbit_ct_helpers:merge_app_env(StepConfig,
119+
{aten,
120+
[{poll_interval,
121+
1000}]})
122+
end]
123+
++ rabbit_ct_broker_helpers:setup_steps());
125124
init_per_group(_, Config) ->
126125
rabbit_ct_helpers:run_setup_steps(Config).
127126

0 commit comments

Comments
 (0)