Skip to content

Commit f4212da

Browse files
committed
Store first and last death time in a stream
Given that we store first and last death time for a given {Queue, Reason} tuple, let's store these infos in a stream and return these infos to the AMQP client. Last death time, similar to first death time can be useful to AMQP clients when there are multiple deaths for the same {Queue, Reason} tuple. AMQP 0.9.1 only exposes a time header referring on the first time.
1 parent 9a84c34 commit f4212da

File tree

4 files changed

+23
-13
lines changed

4 files changed

+23
-13
lines changed

deps/rabbit/include/mc.hrl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-*
3232
last :: death_key(), % redundant to mc annotations x-last-death-*
33-
records :: #{death_key() := #death{}} | % feature flag message_containers_deaths_v2 disabled
34-
[{death_key(), #death{}}] % feature flag message_containers_deaths_v2 enabled
33+
records :: #{death_key() := #death{}} |
34+
%% Records are ordered by death recency when feature flag
35+
%% message_containers_deaths_v2 is enabled
36+
[{death_key(), #death{}}]
3537
}).

deps/rabbit/src/mc_amqp.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,13 +587,15 @@ encode_deaths(Deaths) ->
587587
#death{exchange = Exchange,
588588
routing_keys = RoutingKeys,
589589
count = Count,
590-
anns = Anns = #{first_time := FirstTime}}}) ->
590+
anns = Anns = #{first_time := FirstTime,
591+
last_time := LastTime}}}) ->
591592
RKeys = [{utf8, Rk} || Rk <- RoutingKeys],
592593
Map0 = [
593594
{{symbol, <<"queue">>}, {utf8, Queue}},
594595
{{symbol, <<"reason">>}, {symbol, atom_to_binary(Reason)}},
595596
{{symbol, <<"count">>}, {ulong, Count}},
596-
{{symbol, <<"time">>}, {timestamp, FirstTime}},
597+
{{symbol, <<"first-time">>}, {timestamp, FirstTime}},
598+
{{symbol, <<"last-time">>}, {timestamp, LastTime}},
597599
{{symbol, <<"exchange">>}, {utf8, Exchange}},
598600
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys}}
599601
],

deps/rabbit/src/mc_amqpl.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,12 +604,13 @@ convert_from_amqp_deaths({array, map, Maps}) ->
604604
{{symbol, <<"queue">>}, {utf8, Queue}},
605605
{{symbol, <<"reason">>}, {symbol, Reason}},
606606
{{symbol, <<"count">>}, {ulong, Count}},
607-
{{symbol, <<"time">>}, {timestamp, Timestamp}},
607+
{{symbol, <<"first_time">>}, {timestamp, FirstTime}},
608+
{{symbol, <<"last_time">>}, {timestamp, _LastTime}},
608609
{{symbol, <<"exchange">>}, {utf8, Exchange}},
609610
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}}
610611
] = KvList1,
611612
RKeys = [Key || {utf8, Key} <- RKeys0],
612-
death_table(Queue, Reason, Exchange, RKeys, Count, Timestamp, Ttl)
613+
death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl)
613614
end, Maps),
614615
{true, {<<"x-death">>, array, L}};
615616
convert_from_amqp_deaths(_IgnoreUnknownValue) ->

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3542,7 +3542,8 @@ dead_letter_headers_exchange(Config) ->
35423542
{{symbol, <<"queue">>}, {utf8, QName1}},
35433543
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
35443544
{{symbol, <<"count">>}, {ulong, 1}},
3545-
{{symbol, <<"time">>}, {timestamp, Timestamp}},
3545+
{{symbol, <<"first-time">>}, {timestamp, Timestamp}},
3546+
{{symbol, <<"last-time">>}, {timestamp, Timestamp}},
35463547
{{symbol, <<"exchange">>},{utf8, <<>>}},
35473548
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}}
35483549
]}]}
@@ -3628,28 +3629,31 @@ dead_letter_reject(Config) ->
36283629
{{symbol, <<"queue">>}, {utf8, QName1}},
36293630
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
36303631
{{symbol, <<"count">>}, {ulong, 3}},
3631-
{{symbol, <<"time">>}, {timestamp, Timestamp1}},
3632+
{{symbol, <<"first-time">>}, {timestamp, Ts1}},
3633+
{{symbol, <<"last-time">>}, {timestamp, Ts2}},
36323634
{{symbol, <<"exchange">>},{utf8, <<>>}},
36333635
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}}
36343636
]} = D1,
36353637
{map, [
36363638
{{symbol, <<"queue">>}, {utf8, QName2}},
36373639
{{symbol, <<"reason">>}, {symbol, <<"rejected">>}},
36383640
{{symbol, <<"count">>}, {ulong, 2}},
3639-
{{symbol, <<"time">>}, {timestamp, Timestamp2}},
3641+
{{symbol, <<"first-time">>}, {timestamp, Ts3}},
3642+
{{symbol, <<"last-time">>}, {timestamp, Ts4}},
36403643
{{symbol, <<"exchange">>},{utf8, <<>>}},
36413644
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName2}]}}
36423645
]} = D2,
36433646
{map, [
36443647
{{symbol, <<"queue">>}, {utf8, QName3}},
36453648
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
36463649
{{symbol, <<"count">>}, {ulong, 2}},
3647-
{{symbol, <<"time">>}, {timestamp, Timestamp3}},
3650+
{{symbol, <<"first-time">>}, {timestamp, Ts5}},
3651+
{{symbol, <<"last-time">>}, {timestamp, Ts6}},
36483652
{{symbol, <<"exchange">>},{utf8, <<>>}},
36493653
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName3}]}}
36503654
]} = D3,
3651-
?assert(Timestamp1 < Timestamp2),
3652-
?assert(Timestamp2 < Timestamp3),
3655+
?assertEqual([Ts1, Ts3, Ts5, Ts4, Ts6, Ts2],
3656+
lists:sort([Ts1, Ts2, Ts3, Ts4, Ts5, Ts6])),
36533657

36543658
ok = amqp10_client:detach_link(Receiver),
36553659
ok = amqp10_client:detach_link(Sender),
@@ -3710,7 +3714,8 @@ dead_letter_into_stream(Config) ->
37103714
{{symbol, <<"queue">>}, {utf8, QName0}},
37113715
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
37123716
{{symbol, <<"count">>}, {ulong, 1}},
3713-
{{symbol, <<"time">>}, {timestamp, Timestamp}},
3717+
{{symbol, <<"first-time">>}, {timestamp, Timestamp}},
3718+
{{symbol, <<"last-time">>}, {timestamp, Timestamp}},
37143719
{{symbol, <<"exchange">>},{utf8, <<>>}},
37153720
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName0}]}}
37163721
]}]}

0 commit comments

Comments
 (0)