Skip to content

Commit c35a0b8

Browse files
authored
Merge pull request #11174 from rabbitmq/deaths-v2
Fix dead lettering
2 parents d180474 + 6b300a2 commit c35a0b8

14 files changed

+926
-273
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,13 @@ rabbitmq_integration_suite(
391391
additional_beam = [
392392
":test_queue_utils_beam",
393393
],
394-
shard_count = 7,
394+
shard_count = 8,
395+
)
396+
397+
rabbitmq_integration_suite(
398+
name = "message_containers_deaths_v2_SUITE",
399+
size = "medium",
400+
shard_count = 1,
395401
)
396402

397403
rabbitmq_integration_suite(

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,3 +2219,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
22192219
erlc_opts = "//:test_erlc_opts",
22202220
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
22212221
)
2222+
erlang_bytecode(
2223+
name = "message_containers_deaths_v2_SUITE_beam_files",
2224+
testonly = True,
2225+
srcs = ["test/message_containers_deaths_v2_SUITE.erl"],
2226+
outs = ["test/message_containers_deaths_v2_SUITE.beam"],
2227+
app_name = "rabbit",
2228+
erlc_opts = "//:test_erlc_opts",
2229+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
2230+
)

deps/rabbit/include/mc.hrl

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,3 @@
1-
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
2-
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
3-
last_time := non_neg_integer(), %% the timestamp of the last
4-
ttl => OriginalExpiration :: non_neg_integer()}.
5-
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
6-
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
7-
count = 0 :: non_neg_integer(),
8-
anns :: death_anns()}).
9-
10-
-record(deaths, {first :: death_key(),
11-
last :: death_key(),
12-
records = #{} :: #{death_key() := #death{}}}).
13-
14-
151
%% good enough for most use cases
162
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).
173

@@ -26,3 +12,25 @@
2612
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
2713
-define(ANN_DURABLE, d).
2814
-define(ANN_PRIORITY, p).
15+
16+
-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).
17+
18+
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
19+
-type death_anns() :: #{%% timestamp of the first time this message
20+
%% was dead lettered from this queue for this reason
21+
first_time := pos_integer(),
22+
%% timestamp of the last time this message
23+
%% was dead lettered from this queue for this reason
24+
last_time := pos_integer(),
25+
ttl => OriginalTtlHeader :: non_neg_integer()}.
26+
27+
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
28+
routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...],
29+
%% how many times this message was dead lettered from this queue for this reason
30+
count :: pos_integer(),
31+
anns :: death_anns()}).
32+
33+
-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-*
34+
last :: death_key(), % redundant to mc annotations x-last-death-*
35+
records :: #{death_key() := #death{}}
36+
}).

deps/rabbit/src/mc.erl

Lines changed: 101 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@
3434
convert/3,
3535
protocol_state/1,
3636
prepare/2,
37-
record_death/3,
37+
record_death/4,
3838
is_death_cycle/2,
39-
last_death/1,
4039
death_queue_names/1
4140
]).
4241

@@ -356,89 +355,103 @@ protocol_state(BasicMsg) ->
356355
mc_compat:protocol_state(BasicMsg).
357356

358357
-spec record_death(rabbit_dead_letter:reason(),
359-
SourceQueue :: rabbit_misc:resource_name(),
360-
state()) -> state().
358+
rabbit_misc:resource_name(),
359+
state(),
360+
environment()) -> state().
361361
record_death(Reason, SourceQueue,
362-
#?MODULE{protocol = _Mod,
363-
data = _Data,
364-
annotations = Anns0} = State)
365-
when is_atom(Reason) andalso is_binary(SourceQueue) ->
362+
#?MODULE{annotations = Anns0} = State,
363+
Env)
364+
when is_atom(Reason) andalso
365+
is_binary(SourceQueue) ->
366366
Key = {SourceQueue, Reason},
367367
#{?ANN_EXCHANGE := Exchange,
368368
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
369369
Timestamp = os:system_time(millisecond),
370370
Ttl = maps:get(ttl, Anns0, undefined),
371-
372-
ReasonBin = atom_to_binary(Reason),
373-
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp,
374-
last_time => Timestamp}),
375-
case maps:get(deaths, Anns0, undefined) of
376-
undefined ->
377-
Ds = #deaths{last = Key,
378-
first = Key,
379-
records = #{Key => #death{count = 1,
380-
exchange = Exchange,
381-
routing_keys = RoutingKeys,
382-
anns = DeathAnns}}},
383-
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
371+
DeathAnns = rabbit_misc:maps_put_truthy(
372+
ttl, Ttl, #{first_time => Timestamp,
373+
last_time => Timestamp}),
374+
NewDeath = #death{exchange = Exchange,
375+
routing_keys = RoutingKeys,
376+
count = 1,
377+
anns = DeathAnns},
378+
Anns = case Anns0 of
379+
#{deaths := Deaths0} ->
380+
Deaths = case Deaths0 of
381+
#deaths{records = Rs0} ->
382+
Rs = maps:update_with(
383+
Key,
384+
fun(Death) ->
385+
update_death(Death, Timestamp)
386+
end,
387+
NewDeath,
388+
Rs0),
389+
Deaths0#deaths{last = Key,
390+
records = Rs};
391+
_ ->
392+
%% Deaths are ordered by recency
393+
case lists:keytake(Key, 1, Deaths0) of
394+
{value, {Key, D0}, Deaths1} ->
395+
D = update_death(D0, Timestamp),
396+
[{Key, D} | Deaths1];
397+
false ->
398+
[{Key, NewDeath} | Deaths0]
399+
end
400+
end,
401+
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
402+
<<"x-last-death-queue">> := SourceQueue,
403+
<<"x-last-death-exchange">> := Exchange,
404+
deaths := Deaths};
405+
_ ->
406+
Deaths = case Env of
407+
#{?FF_MC_DEATHS_V2 := false} ->
408+
#deaths{last = Key,
409+
first = Key,
410+
records = #{Key => NewDeath}};
411+
_ ->
412+
[{Key, NewDeath}]
413+
end,
414+
ReasonBin = atom_to_binary(Reason),
415+
Anns0#{<<"x-first-death-reason">> => ReasonBin,
384416
<<"x-first-death-queue">> => SourceQueue,
385417
<<"x-first-death-exchange">> => Exchange,
386418
<<"x-last-death-reason">> => ReasonBin,
387419
<<"x-last-death-queue">> => SourceQueue,
388-
<<"x-last-death-exchange">> => Exchange
389-
},
390-
391-
State#?MODULE{annotations = Anns#{deaths => Ds}};
392-
#deaths{records = Rs} = Ds0 ->
393-
Death = #death{count = C,
394-
anns = DA} = maps:get(Key, Rs,
395-
#death{exchange = Exchange,
396-
routing_keys = RoutingKeys,
397-
anns = DeathAnns}),
398-
Ds = Ds0#deaths{last = Key,
399-
records = Rs#{Key =>
400-
Death#death{count = C + 1,
401-
anns = DA#{last_time => Timestamp}}}},
402-
Anns = Anns0#{deaths => Ds,
403-
<<"x-last-death-reason">> => ReasonBin,
404-
<<"x-last-death-queue">> => SourceQueue,
405-
<<"x-last-death-exchange">> => Exchange},
406-
State#?MODULE{annotations = Anns}
407-
end;
408-
record_death(Reason, SourceQueue, BasicMsg) ->
409-
mc_compat:record_death(Reason, SourceQueue, BasicMsg).
410-
420+
<<"x-last-death-exchange">> => Exchange,
421+
deaths => Deaths}
422+
end,
423+
State#?MODULE{annotations = Anns};
424+
record_death(Reason, SourceQueue, BasicMsg, Env) ->
425+
mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env).
426+
427+
update_death(#death{count = Count,
428+
anns = DeathAnns} = Death, Timestamp) ->
429+
Death#death{count = Count + 1,
430+
anns = DeathAnns#{last_time := Timestamp}}.
411431

412432
-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
433+
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) ->
434+
is_cycle_v1(TargetQueue, maps:keys(Rs));
413435
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
414-
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
436+
is_cycle_v2(TargetQueue, Deaths);
415437
is_death_cycle(_TargetQueue, #?MODULE{}) ->
416438
false;
417439
is_death_cycle(TargetQueue, BasicMsg) ->
418440
mc_compat:is_death_cycle(TargetQueue, BasicMsg).
419441

442+
%% Returns death queue names ordered by recency.
420443
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
421-
death_queue_names(#?MODULE{annotations = Anns}) ->
422-
case maps:get(deaths, Anns, undefined) of
423-
undefined ->
424-
[];
425-
#deaths{records = Records} ->
426-
proplists:get_keys(maps:keys(Records))
427-
end;
444+
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) ->
445+
proplists:get_keys(maps:keys(Rs));
446+
death_queue_names(#?MODULE{annotations = #{deaths := Deaths}}) ->
447+
lists:map(fun({{Queue, _Reason}, _Death}) ->
448+
Queue
449+
end, Deaths);
450+
death_queue_names(#?MODULE{}) ->
451+
[];
428452
death_queue_names(BasicMsg) ->
429453
mc_compat:death_queue_names(BasicMsg).
430454

431-
-spec last_death(state()) ->
432-
undefined | {death_key(), #death{}}.
433-
last_death(#?MODULE{annotations = Anns})
434-
when not is_map_key(deaths, Anns) ->
435-
undefined;
436-
last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last,
437-
records = Rs}}}) ->
438-
{Last, maps:get(Last, Rs)};
439-
last_death(BasicMsg) ->
440-
mc_compat:last_death(BasicMsg).
441-
442455
-spec prepare(read | store, state()) -> state().
443456
prepare(For, #?MODULE{protocol = Proto,
444457
data = Data} = State) ->
@@ -448,24 +461,38 @@ prepare(For, State) ->
448461

449462
%% INTERNAL
450463

451-
%% if there is a death with a source queue that is the same as the target
464+
is_cycle_v2(TargetQueue, Deaths) ->
465+
case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) ->
466+
SourceQueue =/= TargetQueue
467+
end, Deaths) of
468+
{_, []} ->
469+
false;
470+
{L, [H | _]} ->
471+
%% There is a cycle, but we only want to drop the message
472+
%% if the cycle is "fully automatic", i.e. without a client
473+
%% expliclity rejecting the message somewhere in the cycle.
474+
lists:all(fun({{_SourceQueue, Reason}, _Death}) ->
475+
Reason =/= rejected
476+
end, [H | L])
477+
end.
478+
479+
%% The desired v1 behaviour is the following:
480+
%% "If there is a death with a source queue that is the same as the target
452481
%% queue name and there are no newer deaths with the 'rejected' reason then
453-
%% consider this a cycle
454-
is_cycle(_Queue, []) ->
482+
%% consider this a cycle."
483+
%% However, the correct death order cannot be reliably determined in v1.
484+
%% deaths_v2 fixes this bug.
485+
is_cycle_v1(_Queue, []) ->
455486
false;
456-
is_cycle(_Queue, [{_Q, rejected} | _]) ->
487+
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
457488
%% any rejection breaks the cycle
458489
false;
459-
is_cycle(Queue, [{Queue, Reason} | _])
490+
is_cycle_v1(Queue, [{Queue, Reason} | _])
460491
when Reason =/= rejected ->
461492
true;
462-
is_cycle(Queue, [_ | Rem]) ->
463-
is_cycle(Queue, Rem).
493+
is_cycle_v1(Queue, [_ | Rem]) ->
494+
is_cycle_v1(Queue, Rem).
464495

465496
set_received_at_timestamp(Anns) ->
466497
Millis = os:system_time(millisecond),
467498
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.
468-
469-
-ifdef(TEST).
470-
-include_lib("eunit/include/eunit.hrl").
471-
-endif.

0 commit comments

Comments
 (0)