Skip to content

Commit 033d66d

Browse files
committed
Reuse death record
Address PR feedback #11174 (comment)
1 parent 4e5bfa4 commit 033d66d

File tree

4 files changed

+123
-179
lines changed

4 files changed

+123
-179
lines changed

deps/rabbit/include/mc.hrl

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,23 @@
1313
-define(ANN_DURABLE, d).
1414
-define(ANN_PRIORITY, p).
1515

16-
%% RabbitMQ >= 3.13.3
17-
-type death_v2_anns() :: #{ttl => OriginalTtlHeader :: non_neg_integer()}.
18-
-record(death_v2, {source_queue :: rabbit_misc:resource_name(),
19-
reason :: rabbit_dead_letter:reason(),
20-
%% how many times this message was dead lettered
21-
%% from this source_queue for this reason
22-
count :: pos_integer(),
23-
%% timestamp when this message was dead lettered the first time
24-
%% from this source_queue for this reason
25-
first_death_timestamp :: pos_integer(),
26-
original_exchange :: rabbit_misc:resource_name(),
27-
original_routing_keys :: [rabbit_types:routing_key(),...],
28-
annotations :: death_v2_anns()}).
16+
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
17+
-type death_anns() :: #{%% timestamp of the first time this message
18+
%% was dead lettered from this queue for this reason
19+
first_time := pos_integer(),
20+
%% timestamp of the last time this message
21+
%% was dead lettered from this queue for this reason
22+
last_time := pos_integer(),
23+
ttl => OriginalTtlHeader :: non_neg_integer()}.
2924

30-
%% RabbitMQ 3.13.0 - 3.13.2.
31-
-type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
32-
-type death_v1_anns() :: #{first_time := non_neg_integer(),
33-
last_time := non_neg_integer(),
34-
ttl => OriginalExpiration :: non_neg_integer()}.
3525
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
36-
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
37-
count = 0 :: non_neg_integer(),
38-
anns :: death_v1_anns()}).
39-
-record(deaths, {first :: death_v1_key(),
40-
last :: death_v1_key(),
41-
records = #{} :: #{death_v1_key() := #death{}}}).
26+
routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...],
27+
%% how many times this message was dead lettered from this queue for this reason
28+
count :: pos_integer(),
29+
anns :: death_anns()}).
30+
31+
-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-*
32+
last :: death_key(), % redundant to mc annotations x-last-death-*
33+
records :: #{death_key() := #death{}} | % feature flag message_containers_deaths_v2 disabled
34+
[{death_key(), #death{}}] % feature flag message_containers_deaths_v2 enabled
35+
}).

deps/rabbit/src/mc.erl

Lines changed: 67 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -366,109 +366,89 @@ record_death(Reason, SourceQueue,
366366
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
367367
Timestamp = os:system_time(millisecond),
368368
Ttl = maps:get(ttl, Anns0, undefined),
369-
370-
ReasonBin = atom_to_binary(Reason),
371369
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp,
372370
last_time => Timestamp}),
373-
case maps:get(deaths, Anns0, undefined) of
374-
undefined ->
375-
case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
376-
true ->
377-
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange,
378-
RoutingKeys, Timestamp, Ttl, State);
379-
false ->
380-
Ds = #deaths{last = Key,
381-
first = Key,
382-
records = #{Key => #death{count = 1,
383-
exchange = Exchange,
384-
routing_keys = RoutingKeys,
385-
anns = DeathAnns}}},
386-
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
387-
<<"x-first-death-queue">> => SourceQueue,
388-
<<"x-first-death-exchange">> => Exchange,
389-
<<"x-last-death-reason">> => ReasonBin,
390-
<<"x-last-death-queue">> => SourceQueue,
391-
<<"x-last-death-exchange">> => Exchange,
392-
deaths => Ds
393-
},
394-
State#?MODULE{annotations = Anns}
395-
end;
396-
#deaths{records = Rs} = Ds0 ->
397-
Death = #death{count = C,
398-
anns = DA} = maps:get(Key, Rs,
399-
#death{exchange = Exchange,
400-
routing_keys = RoutingKeys,
401-
anns = DeathAnns}),
402-
Ds = Ds0#deaths{last = Key,
403-
records = Rs#{Key =>
404-
Death#death{count = C + 1,
405-
anns = DA#{last_time => Timestamp}}}},
406-
Anns = Anns0#{deaths => Ds,
371+
case Anns0 of
372+
#{deaths := Deaths = #deaths{records = Rs0}} ->
373+
Rs = if is_list(Rs0) ->
374+
%% records are ordered by recency
375+
case lists:keytake(Key, 1, Rs0) of
376+
{value, {Key, D0}, Rs1} ->
377+
D = update_death(D0, Timestamp),
378+
[{Key, D} | Rs1];
379+
false ->
380+
[{Key, #death{exchange = Exchange,
381+
routing_keys = RoutingKeys,
382+
count = 1,
383+
anns = DeathAnns}} | Rs0]
384+
end;
385+
is_map(Rs0) ->
386+
case Rs0 of
387+
#{Key := Death} ->
388+
Rs0#{Key := update_death(Death, Timestamp)};
389+
_ ->
390+
Rs0#{Key => #death{exchange = Exchange,
391+
routing_keys = RoutingKeys,
392+
count = 1,
393+
anns = DeathAnns}}
394+
end
395+
end,
396+
Anns = Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
397+
<<"x-last-death-queue">> := SourceQueue,
398+
<<"x-last-death-exchange">> := Exchange,
399+
deaths := Deaths#deaths{last = Key,
400+
records = Rs}},
401+
State#?MODULE{annotations = Anns};
402+
_ ->
403+
Death = #death{exchange = Exchange,
404+
routing_keys = RoutingKeys,
405+
count = 1,
406+
anns = DeathAnns},
407+
Rs = case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
408+
true -> [{Key, Death}];
409+
false -> #{Key => Death}
410+
end,
411+
ReasonBin = atom_to_binary(Reason),
412+
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
413+
<<"x-first-death-queue">> => SourceQueue,
414+
<<"x-first-death-exchange">> => Exchange,
407415
<<"x-last-death-reason">> => ReasonBin,
408416
<<"x-last-death-queue">> => SourceQueue,
409-
<<"x-last-death-exchange">> => Exchange},
417+
<<"x-last-death-exchange">> => Exchange,
418+
deaths => #deaths{last = Key,
419+
first = Key,
420+
records = Rs}},
410421
State#?MODULE{annotations = Anns}
411422
end;
412423
record_death(Reason, SourceQueue, BasicMsg) ->
413424
mc_compat:record_death(Reason, SourceQueue, BasicMsg).
414425

415-
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl,
416-
#?MODULE{annotations = Anns0} = State) ->
417-
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{}),
418-
Anns = case Anns0 of
419-
#{deaths_v2 := Deaths0} ->
420-
%% deaths_v2 is ordered by recency
421-
Deaths = case deaths_take(SourceQueue, Reason, Deaths0) of
422-
{value, Death0 = #death_v2{count = Count}, Deaths1} ->
423-
Death = Death0#death_v2{count = Count + 1},
424-
[Death | Deaths1];
425-
false ->
426-
Death = #death_v2{source_queue = SourceQueue,
427-
reason = Reason,
428-
count = 1,
429-
first_death_timestamp = Timestamp,
430-
original_exchange = Exchange,
431-
original_routing_keys = RoutingKeys,
432-
annotations = DeathAnns},
433-
[Death | Deaths0]
434-
end,
435-
Anns0#{deaths_v2 := Deaths,
436-
<<"x-last-death-reason">> := ReasonBin,
437-
<<"x-last-death-queue">> := SourceQueue,
438-
<<"x-last-death-exchange">> := Exchange};
439-
_ ->
440-
Death = #death_v2{source_queue = SourceQueue,
441-
reason = Reason,
442-
count = 1,
443-
first_death_timestamp = Timestamp,
444-
original_exchange = Exchange,
445-
original_routing_keys = RoutingKeys,
446-
annotations = DeathAnns},
447-
Anns0#{deaths_v2 => [Death],
448-
<<"x-first-death-reason">> => ReasonBin,
449-
<<"x-first-death-queue">> => SourceQueue,
450-
<<"x-first-death-exchange">> => Exchange,
451-
<<"x-last-death-reason">> => ReasonBin,
452-
<<"x-last-death-queue">> => SourceQueue,
453-
<<"x-last-death-exchange">> => Exchange}
454-
end,
455-
State#?MODULE{annotations = Anns}.
426+
update_death(#death{count = Count,
427+
anns = DeathAnns} = Death, Timestamp) ->
428+
Death#death{count = Count + 1,
429+
anns = DeathAnns#{last_time := Timestamp}}.
456430

457431
-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
458-
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
459-
is_cycle_v2(TargetQueue, Deaths);
460-
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
461-
is_cycle_v1(TargetQueue, maps:keys(Deaths#deaths.records));
432+
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}})
433+
when is_list(Recs) ->
434+
is_cycle_v2(TargetQueue, Recs);
435+
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}})
436+
when is_map(Recs) ->
437+
is_cycle_v1(TargetQueue, maps:keys(Recs));
462438
is_death_cycle(_TargetQueue, #?MODULE{}) ->
463439
false;
464440
is_death_cycle(TargetQueue, BasicMsg) ->
465441
mc_compat:is_death_cycle(TargetQueue, BasicMsg).
466442

467443
%% Returns death queue names ordered by recency.
468444
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
469-
death_queue_names(#?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
470-
lists:map(fun(#death_v2{source_queue = Q}) -> Q end, Deaths);
471-
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) ->
445+
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}})
446+
when is_list(Records) ->
447+
lists:map(fun({{Queue, _Reason}, _Death}) ->
448+
Queue
449+
end, Records);
450+
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}})
451+
when is_map(Records) ->
472452
proplists:get_keys(maps:keys(Records));
473453
death_queue_names(#?MODULE{}) ->
474454
[];
@@ -493,7 +473,7 @@ prepare(For, State) ->
493473
%% INTERNAL
494474

495475
is_cycle_v2(TargetQueue, Deaths) ->
496-
case lists:splitwith(fun(#death_v2{source_queue = SourceQueue}) ->
476+
case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) ->
497477
SourceQueue =/= TargetQueue
498478
end, Deaths) of
499479
{_, []} ->
@@ -502,7 +482,7 @@ is_cycle_v2(TargetQueue, Deaths) ->
502482
%% There is a cycle, but we only want to drop the message
503483
%% if the cycle is "fully automatic", i.e. without a client
504484
%% expliclity rejecting the message somewhere in the cycle.
505-
lists:all(fun(#death_v2{reason = Reason}) ->
485+
lists:all(fun({{_SourceQueue, Reason}, #death{}}) ->
506486
Reason =/= rejected
507487
end, [H | L])
508488
end.
@@ -527,17 +507,3 @@ is_cycle_v1(Queue, [_ | Rem]) ->
527507
set_received_at_timestamp(Anns) ->
528508
Millis = os:system_time(millisecond),
529509
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.
530-
531-
deaths_take(Queue, Reason, Deaths) ->
532-
deaths_take(Queue, Reason, Deaths, []).
533-
534-
deaths_take(Queue,
535-
Reason,
536-
[#death_v2{source_queue = Queue,
537-
reason = Reason} = H | T],
538-
Acc) ->
539-
{value, H, lists:reverse(Acc, T)};
540-
deaths_take(Queue, Reason, [H|T], Acc) ->
541-
deaths_take(Queue, Reason, T, [H|Acc]);
542-
deaths_take(_Queue, _Reason, [], _Acc) ->
543-
false.

deps/rabbit/src/mc_amqp.erl

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,9 @@ protocol_state_message_annotations(MA, Anns) ->
402402
maps_upsert(K, mc_util:infer_type(V), L);
403403
(<<"timestamp_in_ms">>, V, L) ->
404404
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
405-
(deaths_v2, Deaths, L) ->
406-
Maps = encode_deaths(Deaths),
405+
(deaths, #deaths{records = Records}, L)
406+
when is_list(Records) ->
407+
Maps = encode_deaths(Records),
407408
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
408409
(_, _, Acc) ->
409410
Acc
@@ -582,19 +583,17 @@ first_acquirer(Anns) ->
582583

583584
encode_deaths(Deaths) ->
584585
lists:map(
585-
fun(#death_v2{source_queue = Queue,
586-
reason = Reason,
587-
count = Count,
588-
first_death_timestamp = Timestamp,
589-
original_exchange = Exchange,
590-
original_routing_keys = RoutingKeys,
591-
annotations = Anns}) ->
586+
fun({{Queue, Reason},
587+
#death{exchange = Exchange,
588+
routing_keys = RoutingKeys,
589+
count = Count,
590+
anns = Anns = #{first_time := FirstTime}}}) ->
592591
RKeys = [{utf8, Rk} || Rk <- RoutingKeys],
593592
Map0 = [
594593
{{symbol, <<"queue">>}, {utf8, Queue}},
595594
{{symbol, <<"reason">>}, {symbol, atom_to_binary(Reason)}},
596595
{{symbol, <<"count">>}, {ulong, Count}},
597-
{{symbol, <<"time">>}, {timestamp, Timestamp}},
596+
{{symbol, <<"time">>}, {timestamp, FirstTime}},
598597
{{symbol, <<"exchange">>}, {utf8, Exchange}},
599598
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys}}
600599
],

0 commit comments

Comments
 (0)