|
84 | 84 | heartbeat :: undefined | integer(),
|
85 | 85 | heartbeater :: any(),
|
86 | 86 | client_properties = #{} :: #{binary() => binary()},
|
87 |
| - monitors = #{} :: #{reference() => stream()}, |
| 87 | + monitors = #{} :: #{reference() => {pid(), stream()}}, |
88 | 88 | stats_timer :: undefined | rabbit_event:state(),
|
89 | 89 | resource_alarm :: boolean(),
|
90 | 90 | send_file_oct ::
|
@@ -3124,8 +3124,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
|
3124 | 3124 | VirtualHost, Consumer,
|
3125 | 3125 | single_active_consumer(Consumer),
|
3126 | 3126 | Rqsts0);
|
3127 |
| - {_, #consumer{configuration = |
3128 |
| - #consumer_configuration{member_pid = MemberPid}}} -> |
| 3127 | + {MemberPid, #consumer{configuration = |
| 3128 | + #consumer_configuration{member_pid = MemberPid}}} -> |
3129 | 3129 | rabbit_stream_metrics:consumer_cancelled(self(),
|
3130 | 3130 | stream_r(Stream,
|
3131 | 3131 | C0),
|
@@ -3154,17 +3154,17 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
|
3154 | 3154 | true ->
|
3155 | 3155 | {PurgedPubs, PurgedPubToIds} =
|
3156 | 3156 | maps:fold(fun(PubId,
|
3157 |
| - #publisher{stream = S, reference = Ref, leader = MPid}, |
3158 |
| - {Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid -> |
3159 |
| - rabbit_stream_metrics:publisher_deleted(self(), |
| 3157 | + #publisher{stream = S, reference = Ref}, |
| 3158 | + {Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined -> |
| 3159 | + rabbit_stream_metrics:publisher_deleted(self(), |
3160 | 3160 | stream_r(Stream,
|
3161 | 3161 | C1),
|
3162 | 3162 | PubId),
|
3163 | 3163 | {maps:remove(PubId, Pubs),
|
3164 | 3164 | maps:remove({Stream, Ref}, PubToIds)};
|
3165 | 3165 | (PubId,
|
3166 |
| - #publisher{stream = S, reference = Ref}, |
3167 |
| - {Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined -> |
| 3166 | + #publisher{stream = S, reference = Ref, leader = MPid}, |
| 3167 | + {Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid -> |
3168 | 3168 | rabbit_stream_metrics:publisher_deleted(self(),
|
3169 | 3169 | stream_r(Stream,
|
3170 | 3170 | C1),
|
|
0 commit comments