Skip to content

Commit b9f45d3

Browse files
committed
Track requeue history
as described in rabbitmq/rabbitmq-website#2095 This commit adds a test case and fixes a bug in the broker to allow using `array` type in the value of the modified annotations.
1 parent d9ff6a0 commit b9f45d3

File tree

3 files changed

+38
-29
lines changed

3 files changed

+38
-29
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,14 +1178,16 @@ wrap_map_value(true) ->
11781178
{boolean, true};
11791179
wrap_map_value(false) ->
11801180
{boolean, false};
1181-
wrap_map_value(V) when is_integer(V) ->
1182-
{uint, V};
1181+
wrap_map_value(V) when is_integer(V) andalso V >= 0 ->
1182+
uint(V);
11831183
wrap_map_value(V) when is_binary(V) ->
11841184
utf8(V);
11851185
wrap_map_value(V) when is_list(V) ->
11861186
utf8(list_to_binary(V));
11871187
wrap_map_value(V) when is_atom(V) ->
1188-
utf8(atom_to_list(V)).
1188+
utf8(atom_to_list(V));
1189+
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
1190+
TaggedValue.
11891191

11901192
utf8(V) -> amqp10_client_types:utf8(V).
11911193

deps/rabbit/src/mc_util.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
5252
{long, V};
5353
infer_type(V) when is_boolean(V) ->
5454
{boolean, V};
55-
infer_type({T, _} = V) when is_atom(T) ->
56-
%% looks like a pre-tagged type
57-
V.
55+
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
56+
TaggedValue.
5857

5958
utf8_string_is_ascii(UTF8String) ->
6059
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -501,59 +501,67 @@ modified_quorum_queue(Config) ->
501501
ok = amqp10_client:send_msg(Sender, Msg2),
502502
ok = amqp10_client:detach_link(Sender),
503503

504-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
504+
Receiver1Name = <<"receiver 1">>,
505+
Receiver2Name = <<"receiver 2">>,
506+
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
507+
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),
505508

506-
{ok, M1} = amqp10_client:get_msg(Receiver),
509+
{ok, M1} = amqp10_client:get_msg(Receiver1),
507510
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
508511
?assertMatch(#{delivery_count := 0,
509512
first_acquirer := true},
510513
amqp10_msg:headers(M1)),
511-
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
514+
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),
512515

513-
{ok, M2a} = amqp10_client:get_msg(Receiver),
516+
{ok, M2a} = amqp10_client:get_msg(Receiver1),
514517
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
515518
?assertMatch(#{delivery_count := 0,
516519
first_acquirer := true},
517520
amqp10_msg:headers(M2a)),
518-
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
521+
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),
519522

520-
{ok, M2b} = amqp10_client:get_msg(Receiver),
523+
{ok, M2b} = amqp10_client:get_msg(Receiver1),
521524
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
522525
?assertMatch(#{delivery_count := 0,
523526
first_acquirer := false},
524527
amqp10_msg:headers(M2b)),
525-
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
528+
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),
526529

527-
{ok, M2c} = amqp10_client:get_msg(Receiver),
530+
{ok, M2c} = amqp10_client:get_msg(Receiver1),
528531
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
529532
?assertMatch(#{delivery_count := 1,
530533
first_acquirer := false},
531534
amqp10_msg:headers(M2c)),
532-
ok = amqp10_client:settle_msg(Receiver, M2c,
533-
{modified, true, false,
534-
#{<<"x-opt-key">> => <<"val 1">>}}),
535-
536-
{ok, M2d} = amqp10_client:get_msg(Receiver),
535+
ok = amqp10_client:settle_msg(
536+
Receiver1, M2c,
537+
{modified, true, false,
538+
%% Test that a history of requeue events can be tracked as described in
539+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
540+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]}}
541+
}),
542+
543+
{ok, M2d} = amqp10_client:get_msg(Receiver2),
537544
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
538545
?assertMatch(#{delivery_count := 2,
539546
first_acquirer := false},
540547
amqp10_msg:headers(M2d)),
541-
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
542-
ok = amqp10_client:settle_msg(Receiver, M2d,
543-
{modified, false, false,
544-
#{<<"x-opt-key">> => <<"val 2">>,
545-
<<"x-other">> => 99}}),
546-
547-
{ok, M2e} = amqp10_client:get_msg(Receiver),
548+
#{<<"x-opt-requeued-by">> := {array, utf8, L}} = amqp10_msg:message_annotations(M2d),
549+
ok = amqp10_client:settle_msg(
550+
Receiver1, M2d,
551+
{modified, false, false,
552+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L]},
553+
<<"x-other">> => 99}}),
554+
555+
{ok, M2e} = amqp10_client:get_msg(Receiver1),
548556
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
549557
?assertMatch(#{delivery_count := 2,
550558
first_acquirer := false},
551559
amqp10_msg:headers(M2e)),
552-
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
560+
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
553561
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
554-
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
562+
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
555563

556-
ok = amqp10_client:detach_link(Receiver),
564+
ok = amqp10_client:detach_link(Receiver1),
557565
?assertMatch({ok, #{message_count := 1}},
558566
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
559567
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),

0 commit comments

Comments
 (0)