Skip to content

Commit 3a62bf0

Browse files
Merge pull request #8267 from rabbitmq/mergify/bp/v3.12.x/pr-8260
Consumer Timeout Follow-up (backport #8260)
2 parents 22a7134 + d52e37d commit 3a62bf0

File tree

6 files changed

+47
-42
lines changed

6 files changed

+47
-42
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2818,17 +2818,8 @@ get_queue_consumer_timeout(_PA = #pending_ack{queue = QName},
28182818
GCT
28192819
end.
28202820

2821-
get_consumer_timeout(PA = #pending_ack{tag = CTag},
2822-
State = #ch{consumer_mapping = CMap}) ->
2823-
case maps:find(CTag, CMap) of
2824-
{ok, {_, {_, _, _, Args}}} ->
2825-
case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of
2826-
{long, Timeout} -> Timeout;
2827-
_ -> get_queue_consumer_timeout(PA, State)
2828-
end;
2829-
_ ->
2830-
get_queue_consumer_timeout(PA, State)
2831-
end.
2821+
get_consumer_timeout(PA, State) ->
2822+
get_queue_consumer_timeout(PA, State).
28322823

28332824
evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) ->
28342825
case ?QUEUE:get(UAMQ, empty) of

deps/rabbit/test/consumer_timeout_SUITE.erl

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,27 @@
1919
-define(GROUP_CONFIG,
2020
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
2121
{queue_policy, []},
22-
{queue_arguments, []},
23-
{consumer_arguments, []}],
22+
{queue_arguments, []}],
2423
queue_policy_consumer_timeout => [{rabbit, []},
2524
{queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]},
26-
{queue_arguments, []},
27-
{consumer_arguments, []}],
25+
{queue_arguments, []}],
2826
queue_argument_consumer_timeout => [{rabbit, []},
2927
{queue_policy, []},
30-
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]},
31-
{consumer_arguments, []}],
32-
consumer_argument_consumer_timeout => [{rabbit, []},
33-
{queue_policy, []},
34-
{queue_arguments, []},
35-
{consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
28+
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
3629

3730
-import(quorum_queue_utils, [wait_for_messages/2]).
3831

3932
all() ->
4033
[
4134
{group, global_consumer_timeout},
4235
{group, queue_policy_consumer_timeout},
43-
{group, queue_argument_consumer_timeout},
44-
{group, consumer_argument_consumer_timeout}
36+
{group, queue_argument_consumer_timeout}
4537
].
4638

4739
groups() ->
48-
ConsumerTests = [consumer_timeout,
49-
consumer_timeout_no_basic_cancel_capability],
50-
AllTests = ConsumerTests ++ [consumer_timeout_basic_get],
51-
52-
ConsumerTestsParallel = [
53-
{classic_queue, [parallel], ConsumerTests},
54-
{mirrored_queue, [parallel], ConsumerTests},
55-
{quorum_queue, [parallel], ConsumerTests}
56-
],
40+
AllTests = [consumer_timeout,
41+
consumer_timeout_no_basic_cancel_capability,
42+
consumer_timeout_basic_get],
5743

5844
AllTestsParallel = [
5945
{classic_queue, [parallel], AllTests},
@@ -63,8 +49,7 @@ groups() ->
6349
[
6450
{global_consumer_timeout, [], AllTestsParallel},
6551
{queue_policy_consumer_timeout, [], AllTestsParallel},
66-
{queue_argument_consumer_timeout, [], AllTestsParallel},
67-
{consumer_argument_consumer_timeout, [], ConsumerTestsParallel}
52+
{queue_argument_consumer_timeout, [], AllTestsParallel}
6853
].
6954

7055
suite() ->
@@ -158,7 +143,7 @@ consumer_timeout(Config) ->
158143
declare_queue(Ch, Config, QName),
159144
publish(Ch, QName, [<<"msg1">>]),
160145
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
161-
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
146+
subscribe(Ch, QName, false),
162147
erlang:monitor(process, Conn),
163148
erlang:monitor(process, Ch),
164149
receive
@@ -226,7 +211,7 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
226211
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
227212
erlang:monitor(process, Conn),
228213
erlang:monitor(process, Ch),
229-
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
214+
subscribe(Ch, QName, false),
230215
receive
231216
{#'basic.deliver'{delivery_tag = _,
232217
redelivered = false}, _} ->
@@ -280,14 +265,13 @@ consume(Ch, QName, NoAck, Payloads) ->
280265
DTag
281266
end || Payload <- Payloads].
282267

283-
subscribe(Ch, Queue, NoAck, Args) ->
284-
subscribe(Ch, Queue, NoAck, <<"ctag">>, Args).
268+
subscribe(Ch, Queue, NoAck) ->
269+
subscribe(Ch, Queue, NoAck, <<"ctag">>).
285270

286-
subscribe(Ch, Queue, NoAck, Ctag, Args) ->
271+
subscribe(Ch, Queue, NoAck, Ctag) ->
287272
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
288273
no_ack = NoAck,
289-
consumer_tag = Ctag,
290-
arguments = Args
274+
consumer_tag = Ctag
291275
},
292276
self()),
293277
receive

deps/rabbitmq_management/priv/www/js/global.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ var HELP = {
191191
'queue-message-ttl':
192192
'How long a message published to a queue can live before it is discarded (milliseconds).<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/ttl.html#per-queue-message-ttl">x-message-ttl</a>" argument.)',
193193

194+
'queue-consumer-timeout':
195+
'If a consumer does not ack its delivery for more than the <a href="https://www.rabbitmq.com/consumers.html#acknowledgement-timeout">timeout value</a> (30 minutes by default), its channel will be closed with a <code>PRECONDITION_FAILED</code> channel exception.',
196+
194197
'queue-expires':
195198
'How long a queue can be unused for before it is automatically deleted (milliseconds).<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/ttl.html#queue-ttl">x-expires</a>" argument.)',
196199

deps/rabbitmq_management/priv/www/js/tmpl/consumers.ejs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<th>Prefetch count</th>
1515
<th>Active <span class="help" id="consumer-active"></span></th>
1616
<th>Activity status</th>
17+
<th>Consumer Timeout</th>
1718
<th>Arguments</th>
1819
</tr>
1920
</thead>
@@ -34,6 +35,7 @@
3435
<td class="c"><%= consumer.prefetch_count %></td>
3536
<td class="c"><%= fmt_boolean(consumer.active) %></td>
3637
<td class="c"><%= fmt_activity_status(consumer.activity_status) %></td>
38+
<td class="c"><%= consumer.consumer_timeout %></td>
3739
<td class="c"><%= fmt_table_short(consumer.arguments) %></td>
3840
</tr>
3941
<% } %>

deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
<span class="argument-link" field="definition" key="dead-letter-exchange" type="string">Dead letter exchange</span> |
109109
<span class="argument-link" field="definition" key="dead-letter-routing-key" type="string">Dead letter routing key</span><br/>
110110
<span class="argument-link" field="definition" key="message-ttl" type="number">Message TTL</span><span class="help" id="queue-message-ttl"></span></br>
111+
<span class="argument-link" field="definition" key="consumer-timeout" type="number">Consumer Timeout</span><span class="help" id="queue-consumer-timeout"></span></br>
111112
</td>
112113
<tr>
113114
<td>Queues [Classic]</td>

deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,31 @@ augment_consumer({{Q, Ch, CTag}, Props}) ->
252252
[{queue, format_resource(Q)},
253253
{channel_details, augment_channel_pid(Ch)},
254254
{channel_pid, Ch},
255-
{consumer_tag, CTag} | Props].
255+
{consumer_tag, CTag},
256+
{consumer_timeout, consumer_timeout(Props, Q)} | Props].
257+
258+
consumer_timeout(_Props, Q) ->
259+
get_queue_consumer_timeout(Q, get_global_consumer_timeout()).
260+
261+
get_queue_consumer_timeout(QName, GCT) ->
262+
case rabbit_amqqueue:lookup(QName) of
263+
{ok, Q} -> %% should we account for different queue states here?
264+
case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>,
265+
fun (X, Y) -> erlang:min(X, Y) end, Q) of
266+
undefined -> GCT;
267+
Val -> Val
268+
end;
269+
_ ->
270+
GCT
271+
end.
272+
273+
get_global_consumer_timeout() ->
274+
case application:get_env(rabbit, consumer_timeout) of
275+
{ok, MS} when is_integer(MS) ->
276+
MS;
277+
_ ->
278+
undefined
279+
end.
256280

257281
consumers_by_vhost(VHost) ->
258282
ets:select(consumer_stats,

0 commit comments

Comments
 (0)