Skip to content

Commit 289fc51

Browse files
committed
Monitor members to the same stream in stream connection
This commit change makes sure that different members of the same stream are monitored in a given connection. A connection can use the stream leader for a publisher and a replica for a consumer, if the replica crashes for some reason, the connection must detect it and send a metadata notification to the client. Note that client connections should still use different connections for publishers and consumers.
1 parent eca1f76 commit 289fc51

File tree

5 files changed

+131
-52
lines changed

5 files changed

+131
-52
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -912,10 +912,10 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
912912
StatemData) ->
913913
{Connection1, State1} =
914914
case Monitors of
915-
#{MonitorRef := Stream} ->
915+
#{MonitorRef := {MemberPid, Stream}} ->
916916
Monitors1 = maps:remove(MonitorRef, Monitors),
917917
C = Connection#stream_connection{monitors = Monitors1},
918-
case clean_state_after_stream_deletion_or_failure(Stream, C,
918+
case clean_state_after_stream_deletion_or_failure(MemberPid, Stream, C,
919919
State)
920920
of
921921
{cleaned, NewConnection, NewState} ->
@@ -1893,7 +1893,7 @@ handle_frame_post_auth(Transport,
18931893
{request, CorrelationId,
18941894
{delete_publisher, PublisherId}}) ->
18951895
case Publishers of
1896-
#{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
1896+
#{PublisherId := #publisher{stream = Stream, reference = Ref, leader = LeaderPid}} ->
18971897
Connection1 =
18981898
Connection0#stream_connection{publishers =
18991899
maps:remove(PublisherId,
@@ -1902,7 +1902,7 @@ handle_frame_post_auth(Transport,
19021902
maps:remove({Stream, Ref},
19031903
PubToIds)},
19041904
Connection2 =
1905-
maybe_clean_connection_from_stream(Stream, Connection1),
1905+
maybe_clean_connection_from_stream(LeaderPid, Stream, Connection1),
19061906
response(Transport,
19071907
Connection1,
19081908
delete_publisher,
@@ -2418,7 +2418,7 @@ handle_frame_post_auth(Transport,
24182418
CorrelationId),
24192419
{Connection1, State1} =
24202420
case
2421-
clean_state_after_stream_deletion_or_failure(Stream,
2421+
clean_state_after_stream_deletion_or_failure(undefined, Stream,
24222422
Connection,
24232423
State)
24242424
of
@@ -3155,7 +3155,7 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
31553155
kind = queue,
31563156
virtual_host = VHost}.
31573157

3158-
clean_state_after_stream_deletion_or_failure(Stream,
3158+
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
31593159
#stream_connection{virtual_host =
31603160
VirtualHost,
31613161
stream_subscriptions
@@ -3180,16 +3180,30 @@ clean_state_after_stream_deletion_or_failure(Stream,
31803180
#{Stream := SubscriptionIds} = StreamSubscriptions,
31813181
Requests1 = lists:foldl(
31823182
fun(SubId, Rqsts0) ->
3183-
rabbit_stream_metrics:consumer_cancelled(self(),
3184-
stream_r(Stream,
3185-
C0),
3186-
SubId),
31873183
#{SubId := Consumer} = Consumers,
3188-
Rqsts1 = maybe_unregister_consumer(
3189-
VirtualHost, Consumer,
3190-
single_active_consumer(Consumer),
3191-
Rqsts0),
3192-
Rqsts1
3184+
case {MemberPid, Consumer} of
3185+
{undefined, _C} ->
3186+
rabbit_stream_metrics:consumer_cancelled(self(),
3187+
stream_r(Stream,
3188+
C0),
3189+
SubId),
3190+
maybe_unregister_consumer(
3191+
VirtualHost, Consumer,
3192+
single_active_consumer(Consumer),
3193+
Rqsts0);
3194+
{_, #consumer{configuration =
3195+
#consumer_configuration{member_pid = MemberPid}}} ->
3196+
rabbit_stream_metrics:consumer_cancelled(self(),
3197+
stream_r(Stream,
3198+
C0),
3199+
SubId),
3200+
maybe_unregister_consumer(
3201+
VirtualHost, Consumer,
3202+
single_active_consumer(Consumer),
3203+
Rqsts0);
3204+
_ ->
3205+
Rqsts0
3206+
end
31933207
end, Requests0, SubscriptionIds),
31943208
{true,
31953209
C0#stream_connection{stream_subscriptions =
@@ -3207,18 +3221,26 @@ clean_state_after_stream_deletion_or_failure(Stream,
32073221
true ->
32083222
{PurgedPubs, PurgedPubToIds} =
32093223
maps:fold(fun(PubId,
3224+
#publisher{stream = S, reference = Ref, leader = MPid},
3225+
{Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid ->
3226+
rabbit_stream_metrics:publisher_deleted(self(),
3227+
stream_r(Stream,
3228+
C1),
3229+
PubId),
3230+
{maps:remove(PubId, Pubs),
3231+
maps:remove({Stream, Ref}, PubToIds)};
3232+
(PubId,
32103233
#publisher{stream = S, reference = Ref},
3211-
{Pubs, PubToIds}) ->
3212-
case S of
3213-
Stream ->
3234+
{Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined ->
32143235
rabbit_stream_metrics:publisher_deleted(self(),
3215-
stream_r(S,
3236+
stream_r(Stream,
32163237
C1),
32173238
PubId),
32183239
{maps:remove(PubId, Pubs),
32193240
maps:remove({Stream, Ref}, PubToIds)};
3220-
_ -> {Pubs, PubToIds}
3221-
end
3241+
3242+
(_PubId, _Publisher, {Pubs, PubToIds}) ->
3243+
{Pubs, PubToIds}
32223244
end,
32233245
{Publishers, PublisherToIds}, Publishers),
32243246
{true,
@@ -3240,7 +3262,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
32403262
orelse LeadersCleaned
32413263
of
32423264
true ->
3243-
C3 = demonitor_stream(Stream, C2),
3265+
C3 = demonitor_stream(MemberPid, Stream, C2),
32443266
{cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2};
32453267
false ->
32463268
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
@@ -3279,7 +3301,7 @@ remove_subscription(SubscriptionId,
32793301
#stream_connection_state{consumers = Consumers} = State) ->
32803302
#{SubscriptionId := Consumer} = Consumers,
32813303
#consumer{log = Log,
3282-
configuration = #consumer_configuration{stream = Stream}} =
3304+
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
32833305
Consumer,
32843306
rabbit_log:debug("Deleting subscription ~tp (stream ~tp)",
32853307
[SubscriptionId, Stream]),
@@ -3299,7 +3321,7 @@ remove_subscription(SubscriptionId,
32993321
Connection#stream_connection{stream_subscriptions =
33003322
StreamSubscriptions1},
33013323
Consumers1 = maps:remove(SubscriptionId, Consumers),
3302-
Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
3324+
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
33033325
rabbit_stream_metrics:consumer_cancelled(self(),
33043326
stream_r(Stream, Connection2),
33053327
SubscriptionId),
@@ -3312,7 +3334,7 @@ remove_subscription(SubscriptionId,
33123334
{Connection2#stream_connection{outstanding_requests = Requests1},
33133335
State#stream_connection_state{consumers = Consumers1}}.
33143336

3315-
maybe_clean_connection_from_stream(Stream,
3337+
maybe_clean_connection_from_stream(MemberPid, Stream,
33163338
#stream_connection{stream_leaders =
33173339
Leaders} =
33183340
Connection0) ->
@@ -3321,7 +3343,7 @@ maybe_clean_connection_from_stream(Stream,
33213343
stream_has_subscriptions(Stream, Connection0)}
33223344
of
33233345
{false, false} ->
3324-
demonitor_stream(Stream, Connection0);
3346+
demonitor_stream(MemberPid, Stream, Connection0);
33253347
_ ->
33263348
Connection0
33273349
end,
@@ -3330,26 +3352,27 @@ maybe_clean_connection_from_stream(Stream,
33303352

33313353
maybe_monitor_stream(Pid, Stream,
33323354
#stream_connection{monitors = Monitors} = Connection) ->
3333-
case lists:member(Stream, maps:values(Monitors)) of
3355+
case lists:member({Pid, Stream}, maps:values(Monitors)) of
33343356
true ->
33353357
Connection;
33363358
false ->
33373359
MonitorRef = monitor(process, Pid),
33383360
Connection#stream_connection{monitors =
3339-
maps:put(MonitorRef, Stream,
3361+
maps:put(MonitorRef, {Pid, Stream},
33403362
Monitors)}
33413363
end.
33423364

3343-
demonitor_stream(Stream,
3365+
demonitor_stream(MemberPid, Stream,
33443366
#stream_connection{monitors = Monitors0} = Connection) ->
33453367
Monitors =
3346-
maps:fold(fun(MonitorRef, Strm, Acc) ->
3347-
case Strm of
3348-
Stream ->
3349-
demonitor(MonitorRef, [flush]),
3368+
maps:fold(fun(MonitorRef, {MPid, Strm}, Acc) when MPid =:= MemberPid andalso Strm =:= Stream ->
3369+
demonitor(MonitorRef, [flush]),
3370+
Acc;
3371+
(MonitorRef, {_MPid, Strm}, Acc) when MemberPid =:= undefined andalso Strm =:= Stream ->
3372+
demonitor(MonitorRef, [flush]),
33503373
Acc;
3351-
_ -> maps:put(MonitorRef, Strm, Acc)
3352-
end
3374+
(MonitorRef, {MPid, Strm}, Acc) ->
3375+
maps:put(MonitorRef, {MPid, Strm}, Acc)
33533376
end,
33543377
#{}, Monitors0),
33553378
Connection#stream_connection{monitors = Monitors}.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ init_per_group(cluster = Group, Config) ->
111111
{rmq_nodes_count, 3},
112112
{rmq_nodename_suffix, Group},
113113
{tcp_ports_base},
114-
{rabbitmq_ct_tls_verify, verify_none}
114+
{rabbitmq_ct_tls_verify, verify_none},
115+
{find_crashes, false} %% we kill stream members in some tests
115116
]),
116117
rabbit_ct_helpers:run_setup_steps(
117118
Config1,

0 commit comments

Comments
 (0)