Skip to content

Commit cfeddc6

Browse files
Merge pull request #9572 from rabbitmq/monitor-members-of-same-stream
Monitor members to the same stream in stream connection
2 parents f6dd6a4 + 07d7f09 commit cfeddc6

File tree

5 files changed

+132
-53
lines changed

5 files changed

+132
-53
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 59 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
heartbeat :: undefined | integer(),
8585
heartbeater :: any(),
8686
client_properties = #{} :: #{binary() => binary()},
87-
monitors = #{} :: #{reference() => stream()},
87+
monitors = #{} :: #{reference() => {pid(), stream()}},
8888
stats_timer :: undefined | rabbit_event:state(),
8989
resource_alarm :: boolean(),
9090
send_file_oct ::
@@ -912,10 +912,10 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
912912
StatemData) ->
913913
{Connection1, State1} =
914914
case Monitors of
915-
#{MonitorRef := Stream} ->
915+
#{MonitorRef := {MemberPid, Stream}} ->
916916
Monitors1 = maps:remove(MonitorRef, Monitors),
917917
C = Connection#stream_connection{monitors = Monitors1},
918-
case clean_state_after_stream_deletion_or_failure(Stream, C,
918+
case clean_state_after_stream_deletion_or_failure(MemberPid, Stream, C,
919919
State)
920920
of
921921
{cleaned, NewConnection, NewState} ->
@@ -1893,7 +1893,7 @@ handle_frame_post_auth(Transport,
18931893
{request, CorrelationId,
18941894
{delete_publisher, PublisherId}}) ->
18951895
case Publishers of
1896-
#{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
1896+
#{PublisherId := #publisher{stream = Stream, reference = Ref, leader = LeaderPid}} ->
18971897
Connection1 =
18981898
Connection0#stream_connection{publishers =
18991899
maps:remove(PublisherId,
@@ -1902,7 +1902,7 @@ handle_frame_post_auth(Transport,
19021902
maps:remove({Stream, Ref},
19031903
PubToIds)},
19041904
Connection2 =
1905-
maybe_clean_connection_from_stream(Stream, Connection1),
1905+
maybe_clean_connection_from_stream(LeaderPid, Stream, Connection1),
19061906
response(Transport,
19071907
Connection1,
19081908
delete_publisher,
@@ -2418,7 +2418,7 @@ handle_frame_post_auth(Transport,
24182418
CorrelationId),
24192419
{Connection1, State1} =
24202420
case
2421-
clean_state_after_stream_deletion_or_failure(Stream,
2421+
clean_state_after_stream_deletion_or_failure(undefined, Stream,
24222422
Connection,
24232423
State)
24242424
of
@@ -3155,7 +3155,7 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
31553155
kind = queue,
31563156
virtual_host = VHost}.
31573157

3158-
clean_state_after_stream_deletion_or_failure(Stream,
3158+
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
31593159
#stream_connection{virtual_host =
31603160
VirtualHost,
31613161
stream_subscriptions
@@ -3180,16 +3180,30 @@ clean_state_after_stream_deletion_or_failure(Stream,
31803180
#{Stream := SubscriptionIds} = StreamSubscriptions,
31813181
Requests1 = lists:foldl(
31823182
fun(SubId, Rqsts0) ->
3183-
rabbit_stream_metrics:consumer_cancelled(self(),
3184-
stream_r(Stream,
3185-
C0),
3186-
SubId),
31873183
#{SubId := Consumer} = Consumers,
3188-
Rqsts1 = maybe_unregister_consumer(
3189-
VirtualHost, Consumer,
3190-
single_active_consumer(Consumer),
3191-
Rqsts0),
3192-
Rqsts1
3184+
case {MemberPid, Consumer} of
3185+
{undefined, _C} ->
3186+
rabbit_stream_metrics:consumer_cancelled(self(),
3187+
stream_r(Stream,
3188+
C0),
3189+
SubId),
3190+
maybe_unregister_consumer(
3191+
VirtualHost, Consumer,
3192+
single_active_consumer(Consumer),
3193+
Rqsts0);
3194+
{MemberPid, #consumer{configuration =
3195+
#consumer_configuration{member_pid = MemberPid}}} ->
3196+
rabbit_stream_metrics:consumer_cancelled(self(),
3197+
stream_r(Stream,
3198+
C0),
3199+
SubId),
3200+
maybe_unregister_consumer(
3201+
VirtualHost, Consumer,
3202+
single_active_consumer(Consumer),
3203+
Rqsts0);
3204+
_ ->
3205+
Rqsts0
3206+
end
31933207
end, Requests0, SubscriptionIds),
31943208
{true,
31953209
C0#stream_connection{stream_subscriptions =
@@ -3208,17 +3222,25 @@ clean_state_after_stream_deletion_or_failure(Stream,
32083222
{PurgedPubs, PurgedPubToIds} =
32093223
maps:fold(fun(PubId,
32103224
#publisher{stream = S, reference = Ref},
3211-
{Pubs, PubToIds}) ->
3212-
case S of
3213-
Stream ->
3225+
{Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined ->
3226+
rabbit_stream_metrics:publisher_deleted(self(),
3227+
stream_r(Stream,
3228+
C1),
3229+
PubId),
3230+
{maps:remove(PubId, Pubs),
3231+
maps:remove({Stream, Ref}, PubToIds)};
3232+
(PubId,
3233+
#publisher{stream = S, reference = Ref, leader = MPid},
3234+
{Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid ->
32143235
rabbit_stream_metrics:publisher_deleted(self(),
3215-
stream_r(S,
3236+
stream_r(Stream,
32163237
C1),
32173238
PubId),
32183239
{maps:remove(PubId, Pubs),
32193240
maps:remove({Stream, Ref}, PubToIds)};
3220-
_ -> {Pubs, PubToIds}
3221-
end
3241+
3242+
(_PubId, _Publisher, {Pubs, PubToIds}) ->
3243+
{Pubs, PubToIds}
32223244
end,
32233245
{Publishers, PublisherToIds}, Publishers),
32243246
{true,
@@ -3240,7 +3262,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
32403262
orelse LeadersCleaned
32413263
of
32423264
true ->
3243-
C3 = demonitor_stream(Stream, C2),
3265+
C3 = demonitor_stream(MemberPid, Stream, C2),
32443266
{cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2};
32453267
false ->
32463268
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
@@ -3279,7 +3301,7 @@ remove_subscription(SubscriptionId,
32793301
#stream_connection_state{consumers = Consumers} = State) ->
32803302
#{SubscriptionId := Consumer} = Consumers,
32813303
#consumer{log = Log,
3282-
configuration = #consumer_configuration{stream = Stream}} =
3304+
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
32833305
Consumer,
32843306
rabbit_log:debug("Deleting subscription ~tp (stream ~tp)",
32853307
[SubscriptionId, Stream]),
@@ -3299,7 +3321,7 @@ remove_subscription(SubscriptionId,
32993321
Connection#stream_connection{stream_subscriptions =
33003322
StreamSubscriptions1},
33013323
Consumers1 = maps:remove(SubscriptionId, Consumers),
3302-
Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
3324+
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
33033325
rabbit_stream_metrics:consumer_cancelled(self(),
33043326
stream_r(Stream, Connection2),
33053327
SubscriptionId),
@@ -3312,7 +3334,7 @@ remove_subscription(SubscriptionId,
33123334
{Connection2#stream_connection{outstanding_requests = Requests1},
33133335
State#stream_connection_state{consumers = Consumers1}}.
33143336

3315-
maybe_clean_connection_from_stream(Stream,
3337+
maybe_clean_connection_from_stream(MemberPid, Stream,
33163338
#stream_connection{stream_leaders =
33173339
Leaders} =
33183340
Connection0) ->
@@ -3321,7 +3343,7 @@ maybe_clean_connection_from_stream(Stream,
33213343
stream_has_subscriptions(Stream, Connection0)}
33223344
of
33233345
{false, false} ->
3324-
demonitor_stream(Stream, Connection0);
3346+
demonitor_stream(MemberPid, Stream, Connection0);
33253347
_ ->
33263348
Connection0
33273349
end,
@@ -3330,26 +3352,27 @@ maybe_clean_connection_from_stream(Stream,
33303352

33313353
maybe_monitor_stream(Pid, Stream,
33323354
#stream_connection{monitors = Monitors} = Connection) ->
3333-
case lists:member(Stream, maps:values(Monitors)) of
3355+
case lists:member({Pid, Stream}, maps:values(Monitors)) of
33343356
true ->
33353357
Connection;
33363358
false ->
33373359
MonitorRef = monitor(process, Pid),
33383360
Connection#stream_connection{monitors =
3339-
maps:put(MonitorRef, Stream,
3361+
maps:put(MonitorRef, {Pid, Stream},
33403362
Monitors)}
33413363
end.
33423364

3343-
demonitor_stream(Stream,
3365+
demonitor_stream(MemberPid, Stream,
33443366
#stream_connection{monitors = Monitors0} = Connection) ->
33453367
Monitors =
3346-
maps:fold(fun(MonitorRef, Strm, Acc) ->
3347-
case Strm of
3348-
Stream ->
3349-
demonitor(MonitorRef, [flush]),
3368+
maps:fold(fun(MonitorRef, {MPid, Strm}, Acc) when MPid =:= MemberPid andalso Strm =:= Stream ->
3369+
demonitor(MonitorRef, [flush]),
3370+
Acc;
3371+
(MonitorRef, {_MPid, Strm}, Acc) when MemberPid =:= undefined andalso Strm =:= Stream ->
3372+
demonitor(MonitorRef, [flush]),
33503373
Acc;
3351-
_ -> maps:put(MonitorRef, Strm, Acc)
3352-
end
3374+
(MonitorRef, {MPid, Strm}, Acc) ->
3375+
maps:put(MonitorRef, {MPid, Strm}, Acc)
33533376
end,
33543377
#{}, Monitors0),
33553378
Connection#stream_connection{monitors = Monitors}.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ init_per_group(cluster = Group, Config) ->
111111
{rmq_nodes_count, 3},
112112
{rmq_nodename_suffix, Group},
113113
{tcp_ports_base},
114-
{rabbitmq_ct_tls_verify, verify_none}
114+
{rabbitmq_ct_tls_verify, verify_none},
115+
{find_crashes, false} %% we kill stream members in some tests
115116
]),
116117
rabbit_ct_helpers:run_setup_steps(
117118
Config1,

0 commit comments

Comments
 (0)