Skip to content

Commit 3981b2a

Browse files
committed
Fix conflicts and failing tests
Extend message_containers_deaths_v2_SUITE to send 3 messages whose death histories will be stored in 3 different ways: 1. with feature flag message_containers disabled 2. with feature flag message_containers enabled, but message_containers_deaths_v2 disabled 3. with feature flag message_containers_deaths_v2 enabled
1 parent 441fe9f commit 3981b2a

10 files changed

+60
-4383
lines changed

deps/amqp10_common/src/amqp10_binary_generator.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ constructor(timestamp) -> <<16#83>>;
181181
constructor(uuid) -> <<16#98>>;
182182
constructor(null) -> <<16#40>>;
183183
constructor(boolean) -> <<16#56>>;
184+
constructor(map) -> 16#d1; % use large map type for all array elements
184185
constructor(array) -> <<16#f0>>; % use large array type for all nested arrays
185186
constructor(utf8) -> <<16#b1>>;
186187
constructor({described, Descriptor, Primitive}) ->
@@ -203,6 +204,14 @@ generate(ulong, {ulong, V}) -> <<V:64/unsigned>>;
203204
generate(long, {long, V}) -> <<V:64/signed>>;
204205
generate({described, D, P}, {described, D, V}) ->
205206
generate(P, V);
207+
generate(map, {map, KvList}) ->
208+
Count = length(KvList) * 2,
209+
Compound = lists:map(fun({Key, Val}) ->
210+
[(generate(Key)),
211+
(generate(Val))]
212+
end, KvList),
213+
S = iolist_size(Compound),
214+
[<<(S + 4):32, Count:32>>, Compound];
206215
generate(array, {array, Type, List}) ->
207216
Count = length(List),
208217
Body = iolist_to_binary([constructor(Type),

deps/amqp10_common/src/amqp10_binary_parser.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ parse_constructor(16#80) -> ulong;
175175
parse_constructor(16#81) -> long;
176176
parse_constructor(16#40) -> null;
177177
parse_constructor(16#56) -> boolean;
178+
parse_constructor(16#d1) -> map;
178179
parse_constructor(16#f0) -> array;
179180
parse_constructor(0) -> described;
180181
parse_constructor(X) ->

deps/amqp10_common/test/binary_generator_SUITE.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,22 @@ array(_Config) ->
168168
?assertEqual({array, boolean, [true, false]},
169169
roundtrip_return({array, boolean, [{boolean, true}, {boolean, false}]})),
170170

171-
% array of arrays
172-
% TODO: does the inner type need to be consistent across the array?
171+
%% array of arrays
173172
roundtrip({array, array, []}),
174173
roundtrip({array, array, [{array, symbol, [{symbol, <<"ANONYMOUS">>}]}]}),
175174

175+
%% array of maps
176+
roundtrip({array, map, []}),
177+
roundtrip({array, map, [{map, [{{symbol, <<"k1">>}, {utf8, <<"v1">>}}]},
178+
{map, []},
179+
{map, [{{described,
180+
{utf8, <<"URL">>},
181+
{utf8, <<"http://example.org/hello-world">>}},
182+
{byte, -1}},
183+
{{int, 0}, {ulong, 0}}
184+
]}
185+
]}),
186+
176187
Desc = {utf8, <<"URL">>},
177188
roundtrip({array, {described, Desc, utf8},
178189
[{described, Desc, {utf8, <<"http://example.org/hello">>}}]}),

deps/rabbit/app.bzl

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,71 +2153,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21532153
erlc_opts = "//:test_erlc_opts",
21542154
deps = ["//deps/amqp_client:erlang_app"],
21552155
)
2156-
<<<<<<< HEAD
2157-
=======
2158-
2159-
erlang_bytecode(
2160-
name = "test_event_recorder_beam",
2161-
testonly = True,
2162-
srcs = ["test/event_recorder.erl"],
2163-
outs = ["test/event_recorder.beam"],
2164-
app_name = "rabbit",
2165-
erlc_opts = "//:test_erlc_opts",
2166-
deps = ["//deps/rabbit_common:erlang_app"],
2167-
)
2168-
erlang_bytecode(
2169-
name = "amqp_auth_SUITE_beam_files",
2170-
testonly = True,
2171-
srcs = ["test/amqp_auth_SUITE.erl"],
2172-
outs = ["test/amqp_auth_SUITE.beam"],
2173-
app_name = "rabbit",
2174-
erlc_opts = "//:test_erlc_opts",
2175-
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
2176-
)
2177-
erlang_bytecode(
2178-
name = "amqp_client_SUITE_beam_files",
2179-
testonly = True,
2180-
srcs = ["test/amqp_client_SUITE.erl"],
2181-
outs = ["test/amqp_client_SUITE.beam"],
2182-
app_name = "rabbit",
2183-
erlc_opts = "//:test_erlc_opts",
2184-
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
2185-
)
2186-
erlang_bytecode(
2187-
name = "amqp_credit_api_v2_SUITE_beam_files",
2188-
testonly = True,
2189-
srcs = ["test/amqp_credit_api_v2_SUITE.erl"],
2190-
outs = ["test/amqp_credit_api_v2_SUITE.beam"],
2191-
app_name = "rabbit",
2192-
erlc_opts = "//:test_erlc_opts",
2193-
deps = ["//deps/amqp_client:erlang_app"],
2194-
)
2195-
erlang_bytecode(
2196-
name = "amqp_proxy_protocol_SUITE_beam_files",
2197-
testonly = True,
2198-
srcs = ["test/amqp_proxy_protocol_SUITE.erl"],
2199-
outs = ["test/amqp_proxy_protocol_SUITE.beam"],
2200-
app_name = "rabbit",
2201-
erlc_opts = "//:test_erlc_opts",
2202-
)
2203-
erlang_bytecode(
2204-
name = "amqp_system_SUITE_beam_files",
2205-
testonly = True,
2206-
srcs = ["test/amqp_system_SUITE.erl"],
2207-
outs = ["test/amqp_system_SUITE.beam"],
2208-
app_name = "rabbit",
2209-
erlc_opts = "//:test_erlc_opts",
2210-
deps = ["//deps/rabbit_common:erlang_app"],
2211-
)
2212-
erlang_bytecode(
2213-
name = "amqp_address_SUITE_beam_files",
2214-
testonly = True,
2215-
srcs = ["test/amqp_address_SUITE.erl"],
2216-
outs = ["test/amqp_address_SUITE.beam"],
2217-
app_name = "rabbit",
2218-
erlc_opts = "//:test_erlc_opts",
2219-
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
2220-
)
22212156
erlang_bytecode(
22222157
name = "message_containers_deaths_v2_SUITE_beam_files",
22232158
testonly = True,
@@ -2227,4 +2162,3 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
22272162
erlc_opts = "//:test_erlc_opts",
22282163
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
22292164
)
2230-
>>>>>>> 6b300a2f34 (Fix dead lettering)

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ is_cycle_v2(TargetQueue, Deaths) ->
464464
%% queue name and there are no newer deaths with the 'rejected' reason then
465465
%% consider this a cycle."
466466
%% However, the correct death order cannot be reliably determined in v1.
467-
%% deaths_v2 fixes this bug.
467+
%% v2 fixes this bug.
468468
is_cycle_v1(_Queue, []) ->
469469
false;
470470
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->

deps/rabbit/src/mc_amqp.erl

Lines changed: 6 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ protocol_state(Msg0 = #msg{message_annotations = MA0}, Anns) ->
180180
maps_upsert(K, mc_util:infer_type(V), L);
181181
(<<"timestamp_in_ms">>, V, L) ->
182182
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
183+
(deaths, Deaths, L)
184+
when is_list(Deaths) ->
185+
Maps = encode_deaths(Deaths),
186+
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
183187
(_, _, Acc) ->
184188
Acc
185189
end, MA0, Anns),
@@ -241,32 +245,6 @@ msg_to_sections(#msg{header = H,
241245
[H | S4]
242246
end.
243247

244-
<<<<<<< HEAD
245-
=======
246-
-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
247-
amqp_annotations().
248-
protocol_state_message_annotations(MA, Anns) ->
249-
maps:fold(
250-
fun(?ANN_EXCHANGE, Exchange, L) ->
251-
maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L);
252-
(?ANN_ROUTING_KEYS, RKeys, L) ->
253-
RKey = hd(RKeys),
254-
maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L);
255-
(<<"x-", _/binary>> = K, V, L)
256-
when V =/= undefined ->
257-
%% any x-* annotations get added as message annotations
258-
maps_upsert(K, mc_util:infer_type(V), L);
259-
(<<"timestamp_in_ms">>, V, L) ->
260-
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
261-
(deaths, Deaths, L)
262-
when is_list(Deaths) ->
263-
Maps = encode_deaths(Deaths),
264-
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
265-
(_, _, Acc) ->
266-
Acc
267-
end, MA, Anns).
268-
269-
>>>>>>> 6b300a2f34 (Fix dead lettering)
270248
maps_upsert(Key, TaggedVal, KVList) ->
271249
TaggedKey = {symbol, Key},
272250
Elem = {TaggedKey, TaggedVal},
@@ -378,43 +356,6 @@ decode([#'v1_0.amqp_value'{} = B | Rem], #msg{} = Msg) ->
378356
%% an amqp value can only be a singleton
379357
decode(Rem, Msg#msg{data = B}).
380358

381-
<<<<<<< HEAD
382-
key_find(K, [{{_, K}, {_, V}} | _]) ->
383-
V;
384-
key_find(K, [_ | Rem]) ->
385-
key_find(K, Rem);
386-
key_find(_K, []) ->
387-
undefined.
388-
389-
recover_deaths([], Acc) ->
390-
Acc;
391-
recover_deaths([{map, Kvs} | Rem], Acc) ->
392-
Queue = key_find(<<"queue">>, Kvs),
393-
Reason = binary_to_existing_atom(key_find(<<"reason">>, Kvs)),
394-
DA0 = case key_find(<<"original-expiration">>, Kvs) of
395-
undefined ->
396-
#{};
397-
Exp ->
398-
#{ttl => binary_to_integer(Exp)}
399-
end,
400-
RKeys = [RK || {_, RK} <- key_find(<<"routing-keys">>, Kvs)],
401-
Ts = key_find(<<"time">>, Kvs),
402-
DA = DA0#{first_time => Ts,
403-
last_time => Ts},
404-
recover_deaths(Rem,
405-
Acc#{{Queue, Reason} =>
406-
#death{anns = DA,
407-
exchange = key_find(<<"exchange">>, Kvs),
408-
count = key_find(<<"count">>, Kvs),
409-
routing_keys = RKeys}}).
410-
=======
411-
-spec first_acquirer(mc:annotations()) -> boolean().
412-
first_acquirer(Anns) ->
413-
Redelivered = case Anns of
414-
#{redelivered := R} -> R;
415-
_ -> false
416-
end,
417-
not Redelivered.
418359

419360
encode_deaths(Deaths) ->
420361
lists:map(
@@ -442,50 +383,22 @@ encode_deaths(Deaths) ->
442383
end,
443384
{map, Map}
444385
end, Deaths).
445-
>>>>>>> 6b300a2f34 (Fix dead lettering)
446386

447387
essential_properties(#msg{message_annotations = MA} = Msg) ->
448388
Durable = get_property(durable, Msg),
449389
Priority = get_property(priority, Msg),
450390
Timestamp = get_property(timestamp, Msg),
451391
Ttl = get_property(ttl, Msg),
452-
<<<<<<< HEAD
453-
454-
Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
455-
{list, DeathMaps} ->
456-
%% TODO: make more correct?
457-
Def = {utf8, <<>>},
458-
{utf8, FstQ} = message_annotation(<<"x-first-death-queue">>, Msg, Def),
459-
{utf8, FstR} = message_annotation(<<"x-first-death-reason">>, Msg, Def),
460-
{utf8, LastQ} = message_annotation(<<"x-last-death-queue">>, Msg, Def),
461-
{utf8, LastR} = message_annotation(<<"x-last-death-reason">>, Msg, Def),
462-
#deaths{first = {FstQ, binary_to_existing_atom(FstR)},
463-
last = {LastQ, binary_to_existing_atom(LastR)},
464-
records = recover_deaths(DeathMaps, #{})};
465-
_ ->
466-
undefined
467-
end,
392+
468393
Anns = maps_put_falsy(
469394
?ANN_DURABLE, Durable,
470-
=======
471-
Anns0 = #{?ANN_DURABLE => Durable},
472-
Anns = maps_put_truthy(
473-
?ANN_PRIORITY, Priority,
474-
>>>>>>> 6b300a2f34 (Fix dead lettering)
475395
maps_put_truthy(
476396
?ANN_PRIORITY, Priority,
477397
maps_put_truthy(
478-
<<<<<<< HEAD
479398
?ANN_TIMESTAMP, Timestamp,
480399
maps_put_truthy(
481400
ttl, Ttl,
482-
maps_put_truthy(
483-
deaths, Deaths,
484-
#{}))))),
485-
=======
486-
ttl, Ttl,
487-
Anns0))),
488-
>>>>>>> 6b300a2f34 (Fix dead lettering)
401+
#{})))),
489402
case MA of
490403
[] ->
491404
Anns;

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -164,21 +164,6 @@
164164
stability => stable,
165165
depends_on => [quorum_queue]
166166
}}).
167-
<<<<<<< HEAD
168-
=======
169-
170-
-rabbit_feature_flag(
171-
{credit_api_v2,
172-
#{desc => "Credit API v2 between queue clients and queue processes",
173-
stability => stable
174-
}}).
175-
176-
-rabbit_feature_flag(
177-
{message_containers_store_amqp_v1,
178-
#{desc => "Support storing messages in message containers AMQP 1.0 disk format v1",
179-
stability => stable,
180-
depends_on => [message_containers]
181-
}}).
182167

183168
-rabbit_feature_flag(
184169
{message_containers_deaths_v2,
@@ -187,4 +172,3 @@
187172
stability => stable,
188173
depends_on => [message_containers]
189174
}}).
190-
>>>>>>> 6b300a2f34 (Fix dead lettering)

0 commit comments

Comments
 (0)