Skip to content

Commit 1c4af0c

Browse files
authored
Merge pull request #11230 from rabbitmq/dead-letter-bcc
Remove BCC from x-death routing-keys
2 parents f122483 + 90a4010 commit 1c4af0c

File tree

3 files changed

+47
-16
lines changed

3 files changed

+47
-16
lines changed

deps/rabbit/src/mc.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,14 +365,22 @@ record_death(Reason, SourceQueue,
365365
is_binary(SourceQueue) ->
366366
Key = {SourceQueue, Reason},
367367
#{?ANN_EXCHANGE := Exchange,
368-
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
368+
?ANN_ROUTING_KEYS := RKeys0} = Anns0,
369+
%% The routing keys that we record in the death history and will
370+
%% report to the client should include CC, but exclude BCC.
371+
RKeys = case Anns0 of
372+
#{bcc := BccKeys} ->
373+
RKeys0 -- BccKeys;
374+
_ ->
375+
RKeys0
376+
end,
369377
Timestamp = os:system_time(millisecond),
370378
Ttl = maps:get(ttl, Anns0, undefined),
371379
DeathAnns = rabbit_misc:maps_put_truthy(
372380
ttl, Ttl, #{first_time => Timestamp,
373381
last_time => Timestamp}),
374382
NewDeath = #death{exchange = Exchange,
375-
routing_keys = RoutingKeys,
383+
routing_keys = RKeys,
376384
count = 1,
377385
anns = DeathAnns},
378386
Anns = case Anns0 of

deps/rabbit/src/mc_amqpl.erl

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@
5151

5252
%% mc implementation
5353
init(#content{} = Content0) ->
54-
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
54+
Content1 = rabbit_binary_parser:ensure_content_decoded(Content0),
5555
%% project essential properties into annotations
56-
Anns = essential_properties(Content),
57-
{strip_header(Content, ?DELETED_HEADER), Anns}.
56+
Anns = essential_properties(Content1),
57+
Content = strip_header(Content1, ?DELETED_HEADER),
58+
{Content, Anns}.
5859

5960
convert_from(mc_amqp, Sections, Env) ->
6061
{H, MAnn, Prop, AProp, BodyRev, Footer} =
@@ -554,7 +555,7 @@ message(#resource{name = ExchangeNameBin},
554555
Error;
555556
HeaderRoutes ->
556557
{ok, mc:init(?MODULE,
557-
rabbit_basic:strip_bcc_header(Content),
558+
Content,
558559
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
559560
?ANN_EXCHANGE => ExchangeNameBin})}
560561
end.
@@ -795,7 +796,8 @@ message_id(undefined, _HKey, H) ->
795796
essential_properties(#content{} = C) ->
796797
#'P_basic'{delivery_mode = Mode,
797798
priority = Priority,
798-
timestamp = TimestampRaw} = Props = C#content.properties,
799+
timestamp = TimestampRaw,
800+
headers = Headers} = Props = C#content.properties,
799801
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
800802
Timestamp = case TimestampRaw of
801803
undefined ->
@@ -805,6 +807,12 @@ essential_properties(#content{} = C) ->
805807
TimestampRaw * 1000
806808
end,
807809
Durable = Mode == 2,
810+
BccKeys = case rabbit_basic:header(<<"BCC">>, Headers) of
811+
{<<"BCC">>, array, Routes} ->
812+
[Route || {longstr, Route} <- Routes];
813+
_ ->
814+
undefined
815+
end,
808816
maps_put_truthy(
809817
?ANN_PRIORITY, Priority,
810818
maps_put_truthy(
@@ -813,7 +821,9 @@ essential_properties(#content{} = C) ->
813821
?ANN_TIMESTAMP, Timestamp,
814822
maps_put_falsy(
815823
?ANN_DURABLE, Durable,
816-
#{})))).
824+
maps_put_truthy(
825+
bcc, BccKeys,
826+
#{}))))).
817827

818828
%% headers that are added as annotations during conversions
819829
is_internal_header(<<"x-basic-", _/binary>>) ->

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,17 +1379,18 @@ dead_letter_headers_BCC(Config) ->
13791379
routing_key = DLXQName}),
13801380

13811381
P1 = <<"msg1">>,
1382-
BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
1383-
publish(Ch, QName, [P1], [BCCHeader]),
1382+
CCHeader = {<<"CC">>, array, [{longstr, <<"cc 1">>}, {longstr, <<"cc 2">>}]},
1383+
BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}, {longstr, <<"bcc 2">>}]},
1384+
publish(Ch, QName, [P1], [CCHeader, BCCHeader]),
13841385
%% Message is published to both queues because of BCC header and DLX queue bound to both
13851386
%% exchanges
13861387
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
13871388
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
13881389
props = #'P_basic'{headers = Headers1}}} =
1389-
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
1390+
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
13901391
{#'basic.get_ok'{}, #amqp_msg{payload = P1,
13911392
props = #'P_basic'{headers = Headers2}}} =
1392-
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
1393+
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
13931394
%% We check the headers to ensure no dead lettering has happened
13941395
?assertEqual(undefined, header_lookup(Headers1, <<"x-death">>)),
13951396
?assertEqual(undefined, header_lookup(Headers2, <<"x-death">>)),
@@ -1401,10 +1402,15 @@ dead_letter_headers_BCC(Config) ->
14011402
wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
14021403
{#'basic.get_ok'{}, #amqp_msg{payload = P1,
14031404
props = #'P_basic'{headers = Headers3}}} =
1404-
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
1405+
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
14051406
consume_empty(Ch, QName),
14061407
?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)),
1407-
?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
1408+
{array, [{table, Death}]} = rabbit_misc:table_lookup(Headers3, <<"x-death">>),
1409+
{array, RKeys0} = rabbit_misc:table_lookup(Death, <<"routing-keys">>),
1410+
RKeys = [RKey || {longstr, RKey} <- RKeys0],
1411+
%% routing-keys in the death history should include CC but exclude BCC keys
1412+
?assertEqual(lists:sort([QName, <<"cc 1">>, <<"cc 2">>]),
1413+
lists:sort(RKeys)).
14081414

14091415
%% Three top-level headers are added for the very first dead-lettering event.
14101416
%% They are
@@ -1669,7 +1675,11 @@ stream(Config) ->
16691675
#'basic.publish'{routing_key = Q1},
16701676
#amqp_msg{payload = Payload,
16711677
props = #'P_basic'{expiration = <<"0">>,
1672-
headers = [{<<"CC">>, array, [{longstr, <<"other key">>}]}]}
1678+
headers = [{<<"CC">>, array, [{longstr, <<"cc 1">>},
1679+
{longstr, <<"cc 2">>}]},
1680+
{<<"BCC">>, array, [{longstr, <<"bcc 1">>},
1681+
{longstr, <<"bcc 2">>}]}
1682+
]}
16731683
}),
16741684

16751685
#'basic.qos_ok'{} = amqp_channel:call(Ch1, #'basic.qos'{prefetch_count = 1}),
@@ -1710,7 +1720,10 @@ stream(Config) ->
17101720
?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Death1, <<"reason">>)),
17111721
?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)),
17121722
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
1713-
?assertEqual({array, [{longstr, Q1}, {longstr, <<"other key">>}]},
1723+
%% routing-keys in the death history should include CC but exclude BCC keys
1724+
?assertEqual({array, [{longstr, Q1},
1725+
{longstr, <<"cc 1">>},
1726+
{longstr, <<"cc 2">>}]},
17141727
rabbit_misc:table_lookup(Death1, <<"routing-keys">>)),
17151728
?assertEqual({longstr, <<"0">>}, rabbit_misc:table_lookup(Death1, <<"original-expiration">>)),
17161729
{timestamp, T1} = rabbit_misc:table_lookup(Death1, <<"time">>),

0 commit comments

Comments
 (0)