|
84 | 84 | heartbeat :: undefined | integer(),
|
85 | 85 | heartbeater :: any(),
|
86 | 86 | client_properties = #{} :: #{binary() => binary()},
|
| 87 | +<<<<<<< HEAD |
87 | 88 | monitors = #{} :: #{reference() => stream()},
|
88 | 89 | stats_timer :: undefined | reference(),
|
| 90 | +======= |
| 91 | + monitors = #{} :: #{reference() => {pid(), stream()}}, |
| 92 | + stats_timer :: undefined | rabbit_event:state(), |
| 93 | +>>>>>>> d1f597aae2 (Fix a couple for dialyzer warnings) |
89 | 94 | resource_alarm :: boolean(),
|
90 | 95 | send_file_oct ::
|
91 | 96 | atomics:atomics_ref(), % number of bytes sent with send_file (for metrics)
|
@@ -3152,8 +3157,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
|
3152 | 3157 | VirtualHost, Consumer,
|
3153 | 3158 | single_active_consumer(Consumer),
|
3154 | 3159 | Rqsts0);
|
3155 |
| - {_, #consumer{configuration = |
3156 |
| - #consumer_configuration{member_pid = MemberPid}}} -> |
| 3160 | + {MemberPid, #consumer{configuration = |
| 3161 | + #consumer_configuration{member_pid = MemberPid}}} -> |
3157 | 3162 | rabbit_stream_metrics:consumer_cancelled(self(),
|
3158 | 3163 | stream_r(Stream,
|
3159 | 3164 | C0),
|
@@ -3182,17 +3187,17 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
|
3182 | 3187 | true ->
|
3183 | 3188 | {PurgedPubs, PurgedPubToIds} =
|
3184 | 3189 | maps:fold(fun(PubId,
|
3185 |
| - #publisher{stream = S, reference = Ref, leader = MPid}, |
3186 |
| - {Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid -> |
3187 |
| - rabbit_stream_metrics:publisher_deleted(self(), |
| 3190 | + #publisher{stream = S, reference = Ref}, |
| 3191 | + {Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined -> |
| 3192 | + rabbit_stream_metrics:publisher_deleted(self(), |
3188 | 3193 | stream_r(Stream,
|
3189 | 3194 | C1),
|
3190 | 3195 | PubId),
|
3191 | 3196 | {maps:remove(PubId, Pubs),
|
3192 | 3197 | maps:remove({Stream, Ref}, PubToIds)};
|
3193 | 3198 | (PubId,
|
3194 |
| - #publisher{stream = S, reference = Ref}, |
3195 |
| - {Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined -> |
| 3199 | + #publisher{stream = S, reference = Ref, leader = MPid}, |
| 3200 | + {Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid -> |
3196 | 3201 | rabbit_stream_metrics:publisher_deleted(self(),
|
3197 | 3202 | stream_r(Stream,
|
3198 | 3203 | C1),
|
|
0 commit comments