|
84 | 84 | heartbeat :: undefined | integer(),
|
85 | 85 | heartbeater :: any(),
|
86 | 86 | client_properties = #{} :: #{binary() => binary()},
|
87 |
| - monitors = #{} :: #{reference() => stream()}, |
| 87 | + monitors = #{} :: #{reference() => {pid(), stream()}}, |
88 | 88 | stats_timer :: undefined | rabbit_event:state(),
|
89 | 89 | resource_alarm :: boolean(),
|
90 | 90 | send_file_oct ::
|
@@ -3191,8 +3191,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
|
3191 | 3191 | VirtualHost, Consumer,
|
3192 | 3192 | single_active_consumer(Consumer),
|
3193 | 3193 | Rqsts0);
|
3194 |
| - {_, #consumer{configuration = |
3195 |
| - #consumer_configuration{member_pid = MemberPid}}} -> |
| 3194 | + {MemberPid, #consumer{configuration = |
| 3195 | + #consumer_configuration{member_pid = MemberPid}}} -> |
3196 | 3196 | rabbit_stream_metrics:consumer_cancelled(self(),
|
3197 | 3197 | stream_r(Stream,
|
3198 | 3198 | C0),
|
@@ -3221,17 +3221,17 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
|
3221 | 3221 | true ->
|
3222 | 3222 | {PurgedPubs, PurgedPubToIds} =
|
3223 | 3223 | maps:fold(fun(PubId,
|
3224 |
| - #publisher{stream = S, reference = Ref, leader = MPid}, |
3225 |
| - {Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid -> |
3226 |
| - rabbit_stream_metrics:publisher_deleted(self(), |
| 3224 | + #publisher{stream = S, reference = Ref}, |
| 3225 | + {Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined -> |
| 3226 | + rabbit_stream_metrics:publisher_deleted(self(), |
3227 | 3227 | stream_r(Stream,
|
3228 | 3228 | C1),
|
3229 | 3229 | PubId),
|
3230 | 3230 | {maps:remove(PubId, Pubs),
|
3231 | 3231 | maps:remove({Stream, Ref}, PubToIds)};
|
3232 | 3232 | (PubId,
|
3233 |
| - #publisher{stream = S, reference = Ref}, |
3234 |
| - {Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined -> |
| 3233 | + #publisher{stream = S, reference = Ref, leader = MPid}, |
| 3234 | + {Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid -> |
3235 | 3235 | rabbit_stream_metrics:publisher_deleted(self(),
|
3236 | 3236 | stream_r(Stream,
|
3237 | 3237 | C1),
|
|
0 commit comments