Skip to content

Commit e9bb1ef

Browse files
committed
Fix dead lettering
# What? This commit fixes #11159, #11160, #11173. # How? ## Background RabbitMQ allows to dead letter messages for four different reasons, out of which three reasons cause messages to be dead lettered automatically internally in the broker: (maxlen, expired, delivery_limit) and 1 reason is caused by an explicit client action (rejected). RabbitMQ also allows dead letter topologies. When a message is dead lettered, it is re-published to an exchange, and therefore zero to multiple target queues. These target queues can in turn dead letter messages. Hence it is possible to create a cycle of queues where messages get dead lettered endlessly, which is what we want to avoid. ## Alternative approach One approach to avoid such endless cycles is to use a similar concept of the TTL field of the IPv4 datagram, or the hop limit field of an IPv6 datagram. These fields ensure that IP packets aren't cicrulating forever in the Internet. Each router decrements this counter. If this counter reaches 0, the sender will be notified and the message gets dropped. We could use the same approach in RabbitMQ: Whenever a queue dead letters a message, a dead_letter_hop_limit field could be decremented. If this field reaches 0, the message will be dropped. Such a hop limit field could have a sensible default value, for example 32. The sender of the message could override this value. Likewise, the client rejecting a message could set a new value via the Modified outcome. Such an approach has multiple advantages: 1. No dead letter cycle detection per se needs to be performed within the broker which is a slight simplification to what we have today. 2. Simpler dead letter topologies. One very common use case is that clients re-try sending the message after some time by consuming from a dead-letter queue and rejecting the message such that the message gets republished to the original queue. Instead of requiring explicit client actions, which increases complexity, a x-message-ttl argument could be set on the dead-letter queue to automatically retry after some time. This is a big simplification because it eliminates the need of various frameworks that retry, such as https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html 3. No dead letter history information needs to be compressed because there is a clear limit on how often a message gets dead lettered. Therefore, the full history including timestamps of every dead letter event will be available to clients. Disadvantages: 1. Breaks a lot of clients, even for 4.0. ## 3.12 approach Instead of decrementing a counter, the approach up to 3.12 has been to drop the message if the message cycled automatically. A message cycled automatically if no client expliclity rejected the message, i.e. the mesage got dead lettered due to maxlen, expired, or delivery_limit, but not due to rejected. In this approach, the broker must be able to detect such cycles reliably. Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160. To reliably detect cycles, the broker must be able to obtain the exact order of dead letter events for a given message. In 3.13.0 - 3.13.2, the order cannot exactly be determined because wall clock time is used to record the death time. This commit uses the same approach as done in 3.12: a list ordered by death recency is used with the most recent death at the head of the list. To not grow this list endlessly (for example when a client rejects the same message hundreds of times), this list should be compacted. This commit, like 3.12, compacts by tuple `{Queue, Reason}`: If this message got already dead lettered from this Queue for this Reason, then only a counter is incremented and the element is moved to the front of the list. ## Streams & AMQP 1.0 clients Dead lettering from a stream doesn't make sense because: 1. a client cannot reject a message from a stream since the stream must contain the total order of events to be consumed my multiple clients. 2. TTL is implemented by Stream retention where only old Stream segments are automatically deleted (or archived in the future). 3. same applies to maxlen Although messages cannot be dead lettered **from** a stream, messages can be dead lettered **into** a stream. This commit provides clients consuming from a stream the death history: #11173 Additionally, this commit provides AMQP 1.0 clients the death history via message annotation `x-opt-deaths` which contains the same information as AMQP 0.9.1 header `x-death`. Both, storing the death history in a stream and providing death history to an AMQP 1.0 client, use the same encoding: a message annoation `x-opt-deaths` that contains an array of maps ordered by death recency. The information encoded is the same as in the AMQP 0.9.1 x-death header. Instead of providing an array of maps, a better approach could be to use an array of a custom AMQP death type, such as: ```xml <amqp name="rabbitmq"> <section name="custom-types"> <type name="death" class="composite" source="list"> <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/> <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/> <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/> <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/> <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/> <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/> <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/> <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/> </type> </section> </amqp> ``` However, encoding and decoding custom AMQP types that are nested within arrays which in turn are nested within the message annotation map can be difficult for clients and the broker. Also, each client will need to know the custom AMQP type. For now, therefore we use an array of maps. ## Feature flag The new way to record death information is done via mc annotation `deaths_v2`. Because old nodes do not know this new annotation, recording death information via mc annotation `deaths_v2` is hidden behind a new feature flag `message_containers_deaths_v2`. If this feature flag is disabled, a message will continue to use the 3.13.0 - 3.13.2 way to record death information in mc annotation `deaths`, or even the older way within `x-death` header directly if feature flag message_containers is also disabled. Only if feature flag `message_containers_deaths_v2` is enabled and this message hasn't been dead lettered before, will the new mc annotation `deaths_v2` be used.
1 parent a07484a commit e9bb1ef

14 files changed

+941
-229
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,13 @@ rabbitmq_integration_suite(
391391
additional_beam = [
392392
":test_queue_utils_beam",
393393
],
394-
shard_count = 7,
394+
shard_count = 8,
395+
)
396+
397+
rabbitmq_integration_suite(
398+
name = "message_containers_deaths_v2_SUITE",
399+
size = "medium",
400+
shard_count = 1,
395401
)
396402

397403
rabbitmq_integration_suite(
@@ -789,6 +795,9 @@ rabbitmq_suite(
789795
rabbitmq_suite(
790796
name = "mc_unit_SUITE",
791797
size = "small",
798+
runtime_deps = [
799+
"@meck//:erlang_app",
800+
],
792801
deps = [
793802
"//deps/amqp10_common:erlang_app",
794803
"//deps/rabbit_common:erlang_app",

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,3 +2219,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
22192219
erlc_opts = "//:test_erlc_opts",
22202220
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
22212221
)
2222+
erlang_bytecode(
2223+
name = "message_containers_deaths_v2_SUITE_beam_files",
2224+
testonly = True,
2225+
srcs = ["test/message_containers_deaths_v2_SUITE.erl"],
2226+
outs = ["test/message_containers_deaths_v2_SUITE.beam"],
2227+
app_name = "rabbit",
2228+
erlc_opts = "//:test_erlc_opts",
2229+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
2230+
)

deps/rabbit/include/mc.hrl

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,3 @@
1-
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
2-
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
3-
last_time := non_neg_integer(), %% the timestamp of the last
4-
ttl => OriginalExpiration :: non_neg_integer()}.
5-
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
6-
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
7-
count = 0 :: non_neg_integer(),
8-
anns :: death_anns()}).
9-
10-
-record(deaths, {first :: death_key(),
11-
last :: death_key(),
12-
records = #{} :: #{death_key() := #death{}}}).
13-
14-
151
%% good enough for most use cases
162
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).
173

@@ -26,3 +12,30 @@
2612
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
2713
-define(ANN_DURABLE, d).
2814
-define(ANN_PRIORITY, p).
15+
16+
%% RabbitMQ >= 3.13.3
17+
-record(death_v2, {source_queue :: rabbit_misc:resource_name(),
18+
reason :: rabbit_dead_letter:reason(),
19+
%% how many times this message was dead lettered
20+
%% from this source_queue for this reason
21+
count :: pos_integer(),
22+
%% timestamp when this message was dead lettered the first time
23+
%% from this source_queue for this reason
24+
first_death_timestamp :: pos_integer(),
25+
original_exchange :: rabbit_misc:resource_name(),
26+
original_routing_keys :: [rabbit_types:routing_key(),...],
27+
%% original message ttl header if reason is 'expired'
28+
original_ttl :: undefined | non_neg_integer()}).
29+
30+
%% These records were used in RabbitMQ 3.13.0 - 3.13.2.
31+
-type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
32+
-type death_v1_anns() :: #{first_time := non_neg_integer(),
33+
last_time := non_neg_integer(),
34+
ttl => OriginalExpiration :: non_neg_integer()}.
35+
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
36+
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
37+
count = 0 :: non_neg_integer(),
38+
anns :: death_v1_anns()}).
39+
-record(deaths, {first :: death_v1_key(),
40+
last :: death_v1_key(),
41+
records = #{} :: #{death_v1_key() := #death{}}}).

deps/rabbit/src/mc.erl

Lines changed: 113 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
prepare/2,
3737
record_death/3,
3838
is_death_cycle/2,
39-
last_death/1,
4039
death_queue_names/1
4140
]).
4241

@@ -356,13 +355,12 @@ protocol_state(BasicMsg) ->
356355
mc_compat:protocol_state(BasicMsg).
357356

358357
-spec record_death(rabbit_dead_letter:reason(),
359-
SourceQueue :: rabbit_misc:resource_name(),
358+
rabbit_misc:resource_name(),
360359
state()) -> state().
361360
record_death(Reason, SourceQueue,
362-
#?MODULE{protocol = _Mod,
363-
data = _Data,
364-
annotations = Anns0} = State)
365-
when is_atom(Reason) andalso is_binary(SourceQueue) ->
361+
#?MODULE{annotations = Anns0} = State)
362+
when is_atom(Reason) andalso
363+
is_binary(SourceQueue) ->
366364
Key = {SourceQueue, Reason},
367365
#{?ANN_EXCHANGE := Exchange,
368366
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
@@ -374,21 +372,27 @@ record_death(Reason, SourceQueue,
374372
last_time => Timestamp}),
375373
case maps:get(deaths, Anns0, undefined) of
376374
undefined ->
377-
Ds = #deaths{last = Key,
378-
first = Key,
379-
records = #{Key => #death{count = 1,
380-
exchange = Exchange,
381-
routing_keys = RoutingKeys,
382-
anns = DeathAnns}}},
383-
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
384-
<<"x-first-death-queue">> => SourceQueue,
385-
<<"x-first-death-exchange">> => Exchange,
386-
<<"x-last-death-reason">> => ReasonBin,
387-
<<"x-last-death-queue">> => SourceQueue,
388-
<<"x-last-death-exchange">> => Exchange
389-
},
390-
391-
State#?MODULE{annotations = Anns#{deaths => Ds}};
375+
case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
376+
true ->
377+
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange,
378+
RoutingKeys, Timestamp, Ttl, State);
379+
false ->
380+
Ds = #deaths{last = Key,
381+
first = Key,
382+
records = #{Key => #death{count = 1,
383+
exchange = Exchange,
384+
routing_keys = RoutingKeys,
385+
anns = DeathAnns}}},
386+
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
387+
<<"x-first-death-queue">> => SourceQueue,
388+
<<"x-first-death-exchange">> => Exchange,
389+
<<"x-last-death-reason">> => ReasonBin,
390+
<<"x-last-death-queue">> => SourceQueue,
391+
<<"x-last-death-exchange">> => Exchange,
392+
deaths => Ds
393+
},
394+
State#?MODULE{annotations = Anns}
395+
end;
392396
#deaths{records = Rs} = Ds0 ->
393397
Death = #death{count = C,
394398
anns = DA} = maps:get(Key, Rs,
@@ -408,37 +412,68 @@ record_death(Reason, SourceQueue,
408412
record_death(Reason, SourceQueue, BasicMsg) ->
409413
mc_compat:record_death(Reason, SourceQueue, BasicMsg).
410414

415+
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl,
416+
#?MODULE{annotations = Anns0} = State) ->
417+
Anns = case Anns0 of
418+
#{deaths_v2 := Deaths0} ->
419+
%% deaths_v2 is ordered by recency
420+
Deaths = case deaths_take(SourceQueue, Reason, Deaths0) of
421+
{value, Death0 = #death_v2{count = Count}, Deaths1} ->
422+
Death = Death0#death_v2{count = Count + 1},
423+
[Death | Deaths1];
424+
false ->
425+
Death = #death_v2{source_queue = SourceQueue,
426+
reason = Reason,
427+
count = 1,
428+
first_death_timestamp = Timestamp,
429+
original_exchange = Exchange,
430+
original_routing_keys = RoutingKeys,
431+
original_ttl = Ttl},
432+
[Death | Deaths0]
433+
end,
434+
Anns0#{deaths_v2 := Deaths,
435+
<<"x-last-death-reason">> := ReasonBin,
436+
<<"x-last-death-queue">> := SourceQueue,
437+
<<"x-last-death-exchange">> := Exchange};
438+
_ ->
439+
Death = #death_v2{source_queue = SourceQueue,
440+
reason = Reason,
441+
count = 1,
442+
first_death_timestamp = Timestamp,
443+
original_exchange = Exchange,
444+
original_routing_keys = RoutingKeys,
445+
original_ttl = Ttl},
446+
Anns0#{deaths_v2 => [Death],
447+
<<"x-first-death-reason">> => ReasonBin,
448+
<<"x-first-death-queue">> => SourceQueue,
449+
<<"x-first-death-exchange">> => Exchange,
450+
<<"x-last-death-reason">> => ReasonBin,
451+
<<"x-last-death-queue">> => SourceQueue,
452+
<<"x-last-death-exchange">> => Exchange}
453+
end,
454+
State#?MODULE{annotations = Anns}.
411455

412456
-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
457+
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
458+
is_cycle_v2(TargetQueue, Deaths);
413459
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
414-
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
460+
is_cycle_v1(TargetQueue, maps:keys(Deaths#deaths.records));
415461
is_death_cycle(_TargetQueue, #?MODULE{}) ->
416462
false;
417463
is_death_cycle(TargetQueue, BasicMsg) ->
418464
mc_compat:is_death_cycle(TargetQueue, BasicMsg).
419465

466+
%% Returns death queue names ordered by recency.
420467
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
421-
death_queue_names(#?MODULE{annotations = Anns}) ->
422-
case maps:get(deaths, Anns, undefined) of
423-
undefined ->
424-
[];
425-
#deaths{records = Records} ->
426-
proplists:get_keys(maps:keys(Records))
427-
end;
468+
death_queue_names(#?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
469+
lists:map(fun(#death_v2{source_queue = Q}) -> Q end, Deaths);
470+
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) ->
471+
proplists:get_keys(maps:keys(Records));
472+
death_queue_names(#?MODULE{}) ->
473+
[];
428474
death_queue_names(BasicMsg) ->
429475
mc_compat:death_queue_names(BasicMsg).
430476

431-
-spec last_death(state()) ->
432-
undefined | {death_key(), #death{}}.
433-
last_death(#?MODULE{annotations = Anns})
434-
when not is_map_key(deaths, Anns) ->
435-
undefined;
436-
last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last,
437-
records = Rs}}}) ->
438-
{Last, maps:get(Last, Rs)};
439-
last_death(BasicMsg) ->
440-
mc_compat:last_death(BasicMsg).
441-
442477
-spec prepare(read | store, state()) -> state().
443478
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
444479
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
@@ -456,24 +491,52 @@ prepare(For, State) ->
456491

457492
%% INTERNAL
458493

459-
%% if there is a death with a source queue that is the same as the target
494+
is_cycle_v2(TargetQueue, Deaths) ->
495+
case lists:splitwith(fun(#death_v2{source_queue = SourceQueue}) ->
496+
SourceQueue =/= TargetQueue
497+
end, Deaths) of
498+
{_, []} ->
499+
false;
500+
{L, [H | _]} ->
501+
%% There is a cycle, but we only want to drop the message
502+
%% if the cycle is "fully automatic", i.e. without a client
503+
%% expliclity rejecting the message somewhere in the cycle.
504+
lists:all(fun(#death_v2{reason = Reason}) ->
505+
Reason =/= rejected
506+
end, [H | L])
507+
end.
508+
509+
%% The desired v1 behaviour is the following:
510+
%% "If there is a death with a source queue that is the same as the target
460511
%% queue name and there are no newer deaths with the 'rejected' reason then
461-
%% consider this a cycle
462-
is_cycle(_Queue, []) ->
512+
%% consider this a cycle."
513+
%% However, the correct death order cannot be reliably determined in v1.
514+
%% deaths_v2 fixes this bug.
515+
is_cycle_v1(_Queue, []) ->
463516
false;
464-
is_cycle(_Queue, [{_Q, rejected} | _]) ->
517+
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
465518
%% any rejection breaks the cycle
466519
false;
467-
is_cycle(Queue, [{Queue, Reason} | _])
520+
is_cycle_v1(Queue, [{Queue, Reason} | _])
468521
when Reason =/= rejected ->
469522
true;
470-
is_cycle(Queue, [_ | Rem]) ->
471-
is_cycle(Queue, Rem).
523+
is_cycle_v1(Queue, [_ | Rem]) ->
524+
is_cycle_v1(Queue, Rem).
472525

473526
set_received_at_timestamp(Anns) ->
474527
Millis = os:system_time(millisecond),
475528
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.
476529

477-
-ifdef(TEST).
478-
-include_lib("eunit/include/eunit.hrl").
479-
-endif.
530+
deaths_take(Queue, Reason, Deaths) ->
531+
deaths_take(Queue, Reason, Deaths, []).
532+
533+
deaths_take(Queue,
534+
Reason,
535+
[#death_v2{source_queue = Queue,
536+
reason = Reason} = H | T],
537+
Acc) ->
538+
{value, H, lists:reverse(Acc, T)};
539+
deaths_take(Queue, Reason, [H|T], Acc) ->
540+
deaths_take(Queue, Reason, T, [H|Acc]);
541+
deaths_take(_Queue, _Reason, [], _Acc) ->
542+
false.

0 commit comments

Comments
 (0)