Skip to content

Commit cac0839

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Emit cancellation event only when stream consumer is cancelled
Not when the channel or the connection is closed. References #13085, #9356 (cherry picked from commit 69d0382) # Conflicts: # deps/rabbitmq_stream/src/rabbit_stream_reader.erl
1 parent 0fb3634 commit cac0839

File tree

3 files changed

+26
-17
lines changed

3 files changed

+26
-17
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -973,11 +973,7 @@ close(#stream_client{readers = Readers,
973973
name = QName}) ->
974974
maps:foreach(fun (CTag, #stream{log = Log}) ->
975975
close_log(Log),
976-
rabbit_core_metrics:consumer_deleted(self(), CTag, QName),
977-
rabbit_event:notify(consumer_deleted,
978-
[{consumer_tag, CTag},
979-
{channel, self()},
980-
{queue, QName}])
976+
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
981977
end, Readers).
982978

983979
update(Q, State)

deps/rabbitmq_stream/src/rabbit_stream_metrics.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
-export([init/0]).
2323
-export([consumer_created/9,
2424
consumer_updated/9,
25-
consumer_cancelled/3]).
25+
consumer_cancelled/4]).
2626
-export([publisher_created/4,
2727
publisher_updated/7,
2828
publisher_deleted/3]).
@@ -104,16 +104,20 @@ consumer_updated(Connection,
104104

105105
ok.
106106

107-
consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
107+
consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
108108
ets:delete(?TABLE_CONSUMER,
109109
{StreamResource, Connection, SubscriptionId}),
110110
rabbit_global_counters:consumer_deleted(stream),
111111
rabbit_core_metrics:consumer_deleted(Connection,
112112
consumer_tag(SubscriptionId),
113113
StreamResource),
114-
rabbit_event:notify(consumer_deleted,
115-
[{consumer_tag, consumer_tag(SubscriptionId)},
116-
{channel, self()}, {queue, StreamResource}]),
114+
case Notify of
115+
true ->
116+
rabbit_event:notify(consumer_deleted,
117+
[{consumer_tag, consumer_tag(SubscriptionId)},
118+
{channel, self()}, {queue, StreamResource}]);
119+
_ -> ok
120+
end,
117121
ok.
118122

119123
publisher_created(Connection,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
%% The Original Code is RabbitMQ.
1010
%%
1111
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
12+
<<<<<<< HEAD
1213
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
14+
=======
15+
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
16+
>>>>>>> 69d0382dd (Emit cancellation event only when stream consumer is cancelled)
1317
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
1418
%%
1519

@@ -2249,7 +2253,7 @@ handle_frame_post_auth(Transport,
22492253
{Connection, State};
22502254
true ->
22512255
{Connection1, State1} =
2252-
remove_subscription(SubscriptionId, Connection, State),
2256+
remove_subscription(SubscriptionId, Connection, State, true),
22532257
response_ok(Transport, Connection, unsubscribe, CorrelationId),
22542258
{Connection1, State1}
22552259
end;
@@ -3081,7 +3085,7 @@ evaluate_state_after_secret_update(Transport,
30813085
_ ->
30823086
{C1, S1} =
30833087
lists:foldl(fun(SubId, {Conn, St}) ->
3084-
remove_subscription(SubId, Conn, St)
3088+
remove_subscription(SubId, Conn, St, false)
30853089
end, {C0, S0}, Subs),
30863090
{Acc#{Str => ok}, C1, S1}
30873091
end
@@ -3216,7 +3220,8 @@ notify_connection_closed(#statem_data{connection =
32163220
ConnectionState}) ->
32173221
rabbit_core_metrics:connection_closed(self()),
32183222
[rabbit_stream_metrics:consumer_cancelled(self(),
3219-
stream_r(S, Connection), SubId)
3223+
stream_r(S, Connection),
3224+
SubId, false)
32203225
|| #consumer{configuration =
32213226
#consumer_configuration{stream = S,
32223227
subscription_id = SubId}}
@@ -3304,7 +3309,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33043309
rabbit_stream_metrics:consumer_cancelled(self(),
33053310
stream_r(Stream,
33063311
C0),
3307-
SubId),
3312+
SubId,
3313+
false),
33083314
maybe_unregister_consumer(
33093315
VirtualHost, Consumer,
33103316
single_active_consumer(Consumer),
@@ -3314,7 +3320,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33143320
rabbit_stream_metrics:consumer_cancelled(self(),
33153321
stream_r(Stream,
33163322
C0),
3317-
SubId),
3323+
SubId,
3324+
false),
33183325
maybe_unregister_consumer(
33193326
VirtualHost, Consumer,
33203327
single_active_consumer(Consumer),
@@ -3431,7 +3438,8 @@ remove_subscription(SubscriptionId,
34313438
stream_subscriptions =
34323439
StreamSubscriptions} =
34333440
Connection,
3434-
#stream_connection_state{consumers = Consumers} = State) ->
3441+
#stream_connection_state{consumers = Consumers} = State,
3442+
Notify) ->
34353443
#{SubscriptionId := Consumer} = Consumers,
34363444
#consumer{log = Log,
34373445
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
@@ -3457,7 +3465,8 @@ remove_subscription(SubscriptionId,
34573465
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
34583466
rabbit_stream_metrics:consumer_cancelled(self(),
34593467
stream_r(Stream, Connection2),
3460-
SubscriptionId),
3468+
SubscriptionId,
3469+
Notify),
34613470

34623471
Requests1 = maybe_unregister_consumer(
34633472
VirtualHost, Consumer,

0 commit comments

Comments
 (0)