Skip to content

Commit 6c115f6

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Always emit consumer_deleted event when stream consumer goes away
Not only when it is removed explicitly by the client. This is necessary to make sure the consumer record is removed from the management ETS tables (consumer_stats) and to avoid ghost consumers. For other protocols like AMQP 091, the consumer_status ETS table is cleaned up when a channel goes down, but there is no channel concept in the stream protocol. This is not consistent with other protocols or queue implementations (which emits the event only on explicit consumer cancellation) but is necessary to clean up stats correctly. References #13092 (cherry picked from commit 52c89ab)
1 parent 906e554 commit 6c115f6

File tree

2 files changed

+13
-21
lines changed

2 files changed

+13
-21
lines changed

deps/rabbitmq_stream/src/rabbit_stream_metrics.erl

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
-export([init/0]).
2323
-export([consumer_created/10,
2424
consumer_updated/9,
25-
consumer_cancelled/5]).
25+
consumer_cancelled/4]).
2626
-export([publisher_created/4,
2727
publisher_updated/7,
2828
publisher_deleted/3]).
@@ -121,21 +121,17 @@ consumer_updated(Connection,
121121

122122
ok.
123123

124-
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) ->
124+
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser) ->
125125
ets:delete(?TABLE_CONSUMER,
126126
{StreamResource, Connection, SubscriptionId}),
127127
rabbit_global_counters:consumer_deleted(stream),
128128
rabbit_core_metrics:consumer_deleted(Connection,
129129
consumer_tag(SubscriptionId),
130130
StreamResource),
131-
case Notify of
132-
true ->
133-
rabbit_event:notify(consumer_deleted,
134-
[{consumer_tag, consumer_tag(SubscriptionId)},
135-
{channel, self()}, {queue, StreamResource},
136-
{user_who_performed_action, ActingUser}]);
137-
_ -> ok
138-
end,
131+
rabbit_event:notify(consumer_deleted,
132+
[{consumer_tag, consumer_tag(SubscriptionId)},
133+
{channel, self()}, {queue, StreamResource},
134+
{user_who_performed_action, ActingUser}]),
139135
ok.
140136

141137
publisher_created(Connection,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2155,7 +2155,7 @@ handle_frame_post_auth(Transport,
21552155
{Connection, State};
21562156
true ->
21572157
{Connection1, State1} =
2158-
remove_subscription(SubscriptionId, Connection, State, true),
2158+
remove_subscription(SubscriptionId, Connection, State),
21592159
response_ok(Transport, Connection, unsubscribe, CorrelationId),
21602160
{Connection1, State1}
21612161
end;
@@ -3084,7 +3084,7 @@ evaluate_state_after_secret_update(Transport,
30843084
_ ->
30853085
{C1, S1} =
30863086
lists:foldl(fun(SubId, {Conn, St}) ->
3087-
remove_subscription(SubId, Conn, St, false)
3087+
remove_subscription(SubId, Conn, St)
30883088
end, {C0, S0}, Subs),
30893089
{Acc#{Str => ok}, C1, S1}
30903090
end
@@ -3216,7 +3216,7 @@ notify_connection_closed(#statem_data{
32163216
rabbit_core_metrics:connection_closed(self()),
32173217
[rabbit_stream_metrics:consumer_cancelled(self(),
32183218
stream_r(S, Connection),
3219-
SubId, Username, false)
3219+
SubId, Username)
32203220
|| #consumer{configuration =
32213221
#consumer_configuration{stream = S,
32223222
subscription_id = SubId}}
@@ -3298,8 +3298,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
32983298
stream_r(Stream,
32993299
C0),
33003300
SubId,
3301-
Username,
3302-
false),
3301+
Username),
33033302
maybe_unregister_consumer(
33043303
VirtualHost, Consumer,
33053304
single_active_consumer(Consumer),
@@ -3310,8 +3309,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33103309
stream_r(Stream,
33113310
C0),
33123311
SubId,
3313-
Username,
3314-
false),
3312+
Username),
33153313
maybe_unregister_consumer(
33163314
VirtualHost, Consumer,
33173315
single_active_consumer(Consumer),
@@ -3428,8 +3426,7 @@ remove_subscription(SubscriptionId,
34283426
virtual_host = VirtualHost,
34293427
outstanding_requests = Requests0,
34303428
stream_subscriptions = StreamSubscriptions} = Connection,
3431-
#stream_connection_state{consumers = Consumers} = State,
3432-
Notify) ->
3429+
#stream_connection_state{consumers = Consumers} = State) ->
34333430
#{SubscriptionId := Consumer} = Consumers,
34343431
#consumer{log = Log,
34353432
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
@@ -3456,8 +3453,7 @@ remove_subscription(SubscriptionId,
34563453
rabbit_stream_metrics:consumer_cancelled(self(),
34573454
stream_r(Stream, Connection2),
34583455
SubscriptionId,
3459-
Username,
3460-
Notify),
3456+
Username),
34613457

34623458
Requests1 = maybe_unregister_consumer(
34633459
VirtualHost, Consumer,

0 commit comments

Comments
 (0)