Skip to content

Commit 9fdd42c

Browse files
Merge pull request #13435 from rabbitmq/mc-amqp-msg
Handle mc_amqp 3.13 `msg` record in 4.x
2 parents 10693d3 + 91f5ce2 commit 9fdd42c

File tree

1 file changed

+76
-6
lines changed

1 file changed

+76
-6
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,29 @@
5050
Val :: term()}].
5151
-type opt(T) :: T | undefined.
5252

53+
%% This representation was used in v3.13.7. 4.x understands this record for
54+
%% backward compatibility, specifically for the rare case where:
55+
%% 1. a 3.13 node internally parsed a message from a stream via
56+
%% ```
57+
%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
58+
%% ```
59+
%% 2. published this Message to a queue
60+
%% 3. RabbitMQ got upgraded to 4.x
61+
%%
62+
%% This record along with all its conversions in this module can therefore
63+
%% be deleted in some future RabbitMQ version once it's safe to assume that
64+
%% these upgraded messages have all been consumed.
65+
-record(msg,
66+
{
67+
header :: opt(#'v1_0.header'{}),
68+
delivery_annotations = []:: list(),
69+
message_annotations = [] :: list(),
70+
properties :: opt(#'v1_0.properties'{}),
71+
application_properties = [] :: list(),
72+
data = [] :: amqp10_data(),
73+
footer = [] :: list()
74+
}).
75+
5376
%% This representation is used when the message was originally sent with
5477
%% a protocol other than AMQP and the message was not read from a stream.
5578
-record(msg_body_decoded,
@@ -97,7 +120,7 @@
97120
body_code :: body_descriptor_code()
98121
}).
99122

100-
-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
123+
-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
101124

102125
-export_type([state/0]).
103126

@@ -128,6 +151,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
128151
convert_from(_SourceProto, _, _Env) ->
129152
not_implemented.
130153

154+
convert_to(?MODULE, Msg = #msg{}, _Env) ->
155+
convert_from_3_13_msg(Msg);
131156
convert_to(?MODULE, Msg, _Env) ->
132157
Msg;
133158
convert_to(TargetProto, Msg, Env) ->
@@ -139,7 +164,22 @@ size(#v1{message_annotations = MA,
139164
[] -> 0;
140165
_ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE
141166
end,
142-
{MetaSize, byte_size(Body)}.
167+
{MetaSize, byte_size(Body)};
168+
%% Copied from v3.13.7.
169+
%% This might be called in rabbit_fifo_v3 and must therefore not be modified
170+
%% to ensure determinism of quorum queues version 3.
171+
size(#msg{data = Body}) ->
172+
BodySize = if is_list(Body) ->
173+
lists:foldl(
174+
fun(#'v1_0.data'{content = Data}, Acc) ->
175+
iolist_size(Data) + Acc;
176+
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
177+
Acc
178+
end, 0, Body);
179+
is_record(Body, 'v1_0.amqp_value') ->
180+
0
181+
end,
182+
{_MetaSize = 0, BodySize}.
143183

144184
x_header(Key, Msg) ->
145185
message_annotation(Key, Msg, undefined).
@@ -151,6 +191,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
151191
undefined;
152192
property(Prop, #msg_body_encoded{properties = Props}) ->
153193
property0(Prop, Props);
194+
property(_Prop, #msg{properties = undefined}) ->
195+
undefined;
196+
property(Prop, #msg{properties = Props}) ->
197+
property0(Prop, Props);
154198
property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) ->
155199
undefined;
156200
property(Prop, #v1{bare_and_footer = Bin,
@@ -298,7 +342,9 @@ protocol_state(#v1{message_annotations = MA0,
298342
ttl = Ttl}, Anns),
299343
MA = protocol_state_message_annotations(MA0, Anns),
300344
Sections = to_sections(Header, MA, []),
301-
[encode(Sections), BareAndFooter].
345+
[encode(Sections), BareAndFooter];
346+
protocol_state(#msg{} = Msg, Anns) ->
347+
protocol_state(convert_from_3_13_msg(Msg), Anns).
302348

303349
prepare(read, Msg) ->
304350
Msg;
@@ -322,7 +368,9 @@ prepare(store, #msg_body_encoded{
322368
bare_and_footer_application_properties_pos = AppPropsPos,
323369
bare_and_footer_body_pos = BodyPos,
324370
body_code = BodyCode
325-
}.
371+
};
372+
prepare(store, Msg = #msg{}) ->
373+
Msg.
326374

327375
%% internal
328376

@@ -379,7 +427,9 @@ msg_to_sections(#v1{message_annotations = MAC,
379427
Sections = amqp10_framing:decode_bin(Bin),
380428
Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}]
381429
end,
382-
to_sections(undefined, MAC, Tail).
430+
to_sections(undefined, MAC, Tail);
431+
msg_to_sections(#msg{} = Msg) ->
432+
msg_to_sections(convert_from_3_13_msg(Msg)).
383433

384434
to_sections(H, MAC, P, APC, Tail) ->
385435
S0 = case APC of
@@ -410,6 +460,20 @@ to_sections(H, MAC, Tail) ->
410460
[H | S]
411461
end.
412462

463+
convert_from_3_13_msg(#msg{header = H,
464+
delivery_annotations = _,
465+
message_annotations = MAC,
466+
properties = P,
467+
application_properties = APC,
468+
data = Data,
469+
footer = FC}) ->
470+
#msg_body_decoded{header = H,
471+
message_annotations = MAC,
472+
properties = P,
473+
application_properties = APC,
474+
data = Data,
475+
footer = FC}.
476+
413477
-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
414478
amqp_annotations().
415479
protocol_state_message_annotations(MA, Anns) ->
@@ -482,11 +546,14 @@ message_annotation(Key, State, Default)
482546

483547
message_annotations(#msg_body_decoded{message_annotations = L}) -> L;
484548
message_annotations(#msg_body_encoded{message_annotations = L}) -> L;
485-
message_annotations(#v1{message_annotations = L}) -> L.
549+
message_annotations(#v1{message_annotations = L}) -> L;
550+
message_annotations(#msg{message_annotations = L}) -> L.
486551

487552
message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
488553
message_annotations_as_simple_map0(Content);
489554
message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
555+
message_annotations_as_simple_map0(Content);
556+
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
490557
message_annotations_as_simple_map0(Content).
491558

492559
message_annotations_as_simple_map0(Content) ->
@@ -501,6 +568,9 @@ message_annotations_as_simple_map0(Content) ->
501568
application_properties_as_simple_map(
502569
#msg_body_encoded{application_properties = Content}, L) ->
503570
application_properties_as_simple_map0(Content, L);
571+
application_properties_as_simple_map(
572+
#msg{application_properties = Content}, L) ->
573+
application_properties_as_simple_map0(Content, L);
504574
application_properties_as_simple_map(
505575
#v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) ->
506576
L;

0 commit comments

Comments
 (0)