84
84
heartbeat :: undefined | integer (),
85
85
heartbeater :: any (),
86
86
client_properties = #{} :: #{binary () => binary ()},
87
- monitors = #{} :: #{reference () => stream ()},
87
+ monitors = #{} :: #{reference () => { pid (), stream ()} },
88
88
stats_timer :: undefined | rabbit_event :state (),
89
89
resource_alarm :: boolean (),
90
90
send_file_oct ::
@@ -916,10 +916,10 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
916
916
StatemData ) ->
917
917
{Connection1 , State1 } =
918
918
case Monitors of
919
- #{MonitorRef := Stream } ->
919
+ #{MonitorRef := { MemberPid , Stream } } ->
920
920
Monitors1 = maps :remove (MonitorRef , Monitors ),
921
921
C = Connection # stream_connection {monitors = Monitors1 },
922
- case clean_state_after_stream_deletion_or_failure (Stream , C ,
922
+ case clean_state_after_stream_deletion_or_failure (MemberPid , Stream , C ,
923
923
State )
924
924
of
925
925
{cleaned , NewConnection , NewState } ->
@@ -1852,7 +1852,7 @@ handle_frame_post_auth(Transport,
1852
1852
{request , CorrelationId ,
1853
1853
{delete_publisher , PublisherId }}) ->
1854
1854
case Publishers of
1855
- #{PublisherId := # publisher {stream = Stream , reference = Ref }} ->
1855
+ #{PublisherId := # publisher {stream = Stream , reference = Ref , leader = LeaderPid }} ->
1856
1856
Connection1 =
1857
1857
Connection0 # stream_connection {publishers =
1858
1858
maps :remove (PublisherId ,
@@ -1861,7 +1861,7 @@ handle_frame_post_auth(Transport,
1861
1861
maps :remove ({Stream , Ref },
1862
1862
PubToIds )},
1863
1863
Connection2 =
1864
- maybe_clean_connection_from_stream (Stream , Connection1 ),
1864
+ maybe_clean_connection_from_stream (LeaderPid , Stream , Connection1 ),
1865
1865
response (Transport ,
1866
1866
Connection1 ,
1867
1867
delete_publisher ,
@@ -2350,7 +2350,7 @@ handle_frame_post_auth(Transport,
2350
2350
CorrelationId ),
2351
2351
{Connection1 , State1 } =
2352
2352
case
2353
- clean_state_after_stream_deletion_or_failure (Stream ,
2353
+ clean_state_after_stream_deletion_or_failure (undefined , Stream ,
2354
2354
Connection ,
2355
2355
State )
2356
2356
of
@@ -3088,7 +3088,7 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
3088
3088
kind = queue ,
3089
3089
virtual_host = VHost }.
3090
3090
3091
- clean_state_after_stream_deletion_or_failure (Stream ,
3091
+ clean_state_after_stream_deletion_or_failure (MemberPid , Stream ,
3092
3092
# stream_connection {virtual_host =
3093
3093
VirtualHost ,
3094
3094
stream_subscriptions
@@ -3113,16 +3113,30 @@ clean_state_after_stream_deletion_or_failure(Stream,
3113
3113
#{Stream := SubscriptionIds } = StreamSubscriptions ,
3114
3114
Requests1 = lists :foldl (
3115
3115
fun (SubId , Rqsts0 ) ->
3116
- rabbit_stream_metrics :consumer_cancelled (self (),
3117
- stream_r (Stream ,
3118
- C0 ),
3119
- SubId ),
3120
3116
#{SubId := Consumer } = Consumers ,
3121
- Rqsts1 = maybe_unregister_consumer (
3122
- VirtualHost , Consumer ,
3123
- single_active_consumer (Consumer ),
3124
- Rqsts0 ),
3125
- Rqsts1
3117
+ case {MemberPid , Consumer } of
3118
+ {undefined , _C } ->
3119
+ rabbit_stream_metrics :consumer_cancelled (self (),
3120
+ stream_r (Stream ,
3121
+ C0 ),
3122
+ SubId ),
3123
+ maybe_unregister_consumer (
3124
+ VirtualHost , Consumer ,
3125
+ single_active_consumer (Consumer ),
3126
+ Rqsts0 );
3127
+ {MemberPid , # consumer {configuration =
3128
+ # consumer_configuration {member_pid = MemberPid }}} ->
3129
+ rabbit_stream_metrics :consumer_cancelled (self (),
3130
+ stream_r (Stream ,
3131
+ C0 ),
3132
+ SubId ),
3133
+ maybe_unregister_consumer (
3134
+ VirtualHost , Consumer ,
3135
+ single_active_consumer (Consumer ),
3136
+ Rqsts0 );
3137
+ _ ->
3138
+ Rqsts0
3139
+ end
3126
3140
end , Requests0 , SubscriptionIds ),
3127
3141
{true ,
3128
3142
C0 # stream_connection {stream_subscriptions =
@@ -3141,17 +3155,25 @@ clean_state_after_stream_deletion_or_failure(Stream,
3141
3155
{PurgedPubs , PurgedPubToIds } =
3142
3156
maps :fold (fun (PubId ,
3143
3157
# publisher {stream = S , reference = Ref },
3144
- {Pubs , PubToIds }) ->
3145
- case S of
3146
- Stream ->
3158
+ {Pubs , PubToIds }) when S =:= Stream andalso MemberPid =:= undefined ->
3159
+ rabbit_stream_metrics :publisher_deleted (self (),
3160
+ stream_r (Stream ,
3161
+ C1 ),
3162
+ PubId ),
3163
+ {maps :remove (PubId , Pubs ),
3164
+ maps :remove ({Stream , Ref }, PubToIds )};
3165
+ (PubId ,
3166
+ # publisher {stream = S , reference = Ref , leader = MPid },
3167
+ {Pubs , PubToIds }) when S =:= Stream andalso MPid =:= MemberPid ->
3147
3168
rabbit_stream_metrics :publisher_deleted (self (),
3148
- stream_r (S ,
3169
+ stream_r (Stream ,
3149
3170
C1 ),
3150
3171
PubId ),
3151
3172
{maps :remove (PubId , Pubs ),
3152
3173
maps :remove ({Stream , Ref }, PubToIds )};
3153
- _ -> {Pubs , PubToIds }
3154
- end
3174
+
3175
+ (_PubId , _Publisher , {Pubs , PubToIds }) ->
3176
+ {Pubs , PubToIds }
3155
3177
end ,
3156
3178
{Publishers , PublisherToIds }, Publishers ),
3157
3179
{true ,
@@ -3173,7 +3195,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
3173
3195
orelse LeadersCleaned
3174
3196
of
3175
3197
true ->
3176
- C3 = demonitor_stream (Stream , C2 ),
3198
+ C3 = demonitor_stream (MemberPid , Stream , C2 ),
3177
3199
{cleaned , C3 # stream_connection {stream_leaders = Leaders1 }, S2 };
3178
3200
false ->
3179
3201
{not_cleaned , C2 # stream_connection {stream_leaders = Leaders1 }, S2 }
@@ -3212,7 +3234,7 @@ remove_subscription(SubscriptionId,
3212
3234
# stream_connection_state {consumers = Consumers } = State ) ->
3213
3235
#{SubscriptionId := Consumer } = Consumers ,
3214
3236
# consumer {log = Log ,
3215
- configuration = # consumer_configuration {stream = Stream }} =
3237
+ configuration = # consumer_configuration {stream = Stream , member_pid = MemberPid }} =
3216
3238
Consumer ,
3217
3239
rabbit_log :debug (" Deleting subscription ~tp (stream ~tp )" ,
3218
3240
[SubscriptionId , Stream ]),
@@ -3232,7 +3254,7 @@ remove_subscription(SubscriptionId,
3232
3254
Connection # stream_connection {stream_subscriptions =
3233
3255
StreamSubscriptions1 },
3234
3256
Consumers1 = maps :remove (SubscriptionId , Consumers ),
3235
- Connection2 = maybe_clean_connection_from_stream (Stream , Connection1 ),
3257
+ Connection2 = maybe_clean_connection_from_stream (MemberPid , Stream , Connection1 ),
3236
3258
rabbit_stream_metrics :consumer_cancelled (self (),
3237
3259
stream_r (Stream , Connection2 ),
3238
3260
SubscriptionId ),
@@ -3245,7 +3267,7 @@ remove_subscription(SubscriptionId,
3245
3267
{Connection2 # stream_connection {outstanding_requests = Requests1 },
3246
3268
State # stream_connection_state {consumers = Consumers1 }}.
3247
3269
3248
- maybe_clean_connection_from_stream (Stream ,
3270
+ maybe_clean_connection_from_stream (MemberPid , Stream ,
3249
3271
# stream_connection {stream_leaders =
3250
3272
Leaders } =
3251
3273
Connection0 ) ->
@@ -3254,7 +3276,7 @@ maybe_clean_connection_from_stream(Stream,
3254
3276
stream_has_subscriptions (Stream , Connection0 )}
3255
3277
of
3256
3278
{false , false } ->
3257
- demonitor_stream (Stream , Connection0 );
3279
+ demonitor_stream (MemberPid , Stream , Connection0 );
3258
3280
_ ->
3259
3281
Connection0
3260
3282
end ,
@@ -3263,26 +3285,27 @@ maybe_clean_connection_from_stream(Stream,
3263
3285
3264
3286
maybe_monitor_stream (Pid , Stream ,
3265
3287
# stream_connection {monitors = Monitors } = Connection ) ->
3266
- case lists :member (Stream , maps :values (Monitors )) of
3288
+ case lists :member ({ Pid , Stream } , maps :values (Monitors )) of
3267
3289
true ->
3268
3290
Connection ;
3269
3291
false ->
3270
3292
MonitorRef = monitor (process , Pid ),
3271
3293
Connection # stream_connection {monitors =
3272
- maps :put (MonitorRef , Stream ,
3294
+ maps :put (MonitorRef , { Pid , Stream } ,
3273
3295
Monitors )}
3274
3296
end .
3275
3297
3276
- demonitor_stream (Stream ,
3298
+ demonitor_stream (MemberPid , Stream ,
3277
3299
# stream_connection {monitors = Monitors0 } = Connection ) ->
3278
3300
Monitors =
3279
- maps :fold (fun (MonitorRef , Strm , Acc ) ->
3280
- case Strm of
3281
- Stream ->
3282
- demonitor (MonitorRef , [flush ]),
3301
+ maps :fold (fun (MonitorRef , {MPid , Strm }, Acc ) when MPid =:= MemberPid andalso Strm =:= Stream ->
3302
+ demonitor (MonitorRef , [flush ]),
3303
+ Acc ;
3304
+ (MonitorRef , {_MPid , Strm }, Acc ) when MemberPid =:= undefined andalso Strm =:= Stream ->
3305
+ demonitor (MonitorRef , [flush ]),
3283
3306
Acc ;
3284
- _ -> maps : put (MonitorRef , Strm , Acc )
3285
- end
3307
+ (MonitorRef , { MPid , Strm } , Acc ) ->
3308
+ maps : put ( MonitorRef , { MPid , Strm }, Acc )
3286
3309
end ,
3287
3310
#{}, Monitors0 ),
3288
3311
Connection # stream_connection {monitors = Monitors }.
0 commit comments