|
19 | 19 | -import(rabbit_misc,
|
20 | 20 | [maps_put_truthy/3]).
|
21 | 21 |
|
22 |
| --type message_section() :: |
23 |
| - #'v1_0.header'{} | |
24 |
| - #'v1_0.delivery_annotations'{} | |
25 |
| - #'v1_0.message_annotations'{} | |
26 |
| - #'v1_0.properties'{} | |
27 |
| - #'v1_0.application_properties'{} | |
28 |
| - #'v1_0.data'{} | |
29 |
| - #'v1_0.amqp_sequence'{} | |
30 |
| - #'v1_0.amqp_value'{} | |
31 |
| - #'v1_0.footer'{}. |
32 |
| - |
33 | 22 | -define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100).
|
34 | 23 |
|
35 | 24 | -define(SIMPLE_VALUE(V),
|
|
50 | 39 | -type body_descriptor_code() :: ?DESCRIPTOR_CODE_DATA |
|
51 | 40 | ?DESCRIPTOR_CODE_AMQP_SEQUENCE |
|
52 | 41 | ?DESCRIPTOR_CODE_AMQP_VALUE.
|
53 |
| --type amqp_map() :: [{term(), term()}]. |
| 42 | +%% §3.2.5 |
| 43 | +-type application_properties() :: [{Key :: {utf8, binary()}, |
| 44 | + Val :: term()}]. |
| 45 | +%% §3.2.10 |
| 46 | +-type amqp_annotations() :: [{Key :: {symbol, binary()} | {ulong, non_neg_integer()}, |
| 47 | + Val :: term()}]. |
54 | 48 | -type opt(T) :: T | undefined.
|
55 | 49 |
|
56 | 50 | %% This representation is used when the message was originally sent with
|
|
67 | 61 |
|
68 | 62 | %% This representation is used when we received the message from
|
69 | 63 | %% an AMQP client or when we read the message from a stream.
|
70 |
| -%% This message was parsed up to the section preceding the body. |
| 64 | +%% This message was parsed only until the start of the body. |
71 | 65 | -record(msg_body_encoded,
|
72 | 66 | {
|
73 | 67 | header :: opt(#'v1_0.header'{}),
|
74 |
| - message_annotations = [] :: amqp_map(), |
| 68 | + message_annotations = [] :: amqp_annotations(), |
75 | 69 | properties :: opt(#'v1_0.properties'{}),
|
76 |
| - application_properties = [] :: amqp_map(), |
| 70 | + application_properties = [] :: application_properties(), |
77 | 71 | bare_and_footer = uninit :: uninit | binary(),
|
78 | 72 | bare_and_footer_application_properties_pos = ?OMITTED_SECTION :: non_neg_integer() | ?OMITTED_SECTION,
|
79 | 73 | bare_and_footer_body_pos = uninit :: uninit | non_neg_integer(),
|
|
92 | 86 | %% the future.
|
93 | 87 | -record(v1,
|
94 | 88 | {
|
95 |
| - message_annotations = [] :: amqp_map(), |
| 89 | + message_annotations = [] :: amqp_annotations(), |
96 | 90 | bare_and_footer :: binary(),
|
97 | 91 | bare_and_footer_properties_pos :: 0 | ?OMITTED_SECTION,
|
98 | 92 | bare_and_footer_application_properties_pos :: non_neg_integer() | ?OMITTED_SECTION,
|
|
102 | 96 |
|
103 | 97 | -opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
|
104 | 98 |
|
105 |
| --export_type([ |
106 |
| - state/0, |
107 |
| - message_section/0 |
108 |
| - ]). |
| 99 | +-export_type([state/0]). |
109 | 100 |
|
110 |
| -init(Payload) when is_binary(Payload) -> |
| 101 | +init(Payload) -> |
111 | 102 | Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
|
112 |
| - Msg = msg_body_encoded(Sections, Payload), |
| 103 | + Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}), |
113 | 104 | Anns = essential_properties(Msg),
|
114 | 105 | {Msg, Anns}.
|
115 | 106 |
|
116 | 107 | convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
|
117 |
| - msg_body_decoded(Sections); |
| 108 | + msg_body_decoded(Sections, #msg_body_decoded{}); |
118 | 109 | convert_from(_SourceProto, _, _Env) ->
|
119 | 110 | not_implemented.
|
120 | 111 |
|
@@ -396,7 +387,8 @@ to_sections(H, MAC, Tail) ->
|
396 | 387 | [H | S]
|
397 | 388 | end.
|
398 | 389 |
|
399 |
| --spec protocol_state_message_annotations(amqp_map(), mc:annotations()) -> amqp_map(). |
| 390 | +-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) -> |
| 391 | + amqp_annotations(). |
400 | 392 | protocol_state_message_annotations(MA, Anns) ->
|
401 | 393 | maps:fold(
|
402 | 394 | fun(?ANN_EXCHANGE, Exchange, L) ->
|
@@ -508,35 +500,29 @@ application_properties_as_simple_map0(Content, L) ->
|
508 | 500 | Acc
|
509 | 501 | end, L, Content).
|
510 | 502 |
|
511 |
| -msg_body_decoded(Sections) -> |
512 |
| - msg_body_decoded(Sections, #msg_body_decoded{}). |
513 |
| - |
514 | 503 | msg_body_decoded([], Acc) ->
|
515 | 504 | Acc;
|
516 | 505 | msg_body_decoded([#'v1_0.header'{} = H | Rem], Msg) ->
|
517 | 506 | msg_body_decoded(Rem, Msg#msg_body_decoded{header = H});
|
| 507 | +msg_body_decoded([_Ignore = #'v1_0.delivery_annotations'{} | Rem], Msg) -> |
| 508 | + msg_body_decoded(Rem, Msg); |
518 | 509 | msg_body_decoded([#'v1_0.message_annotations'{content = MAC} | Rem], Msg) ->
|
519 | 510 | msg_body_decoded(Rem, Msg#msg_body_decoded{message_annotations = MAC});
|
520 | 511 | msg_body_decoded([#'v1_0.properties'{} = P | Rem], Msg) ->
|
521 | 512 | msg_body_decoded(Rem, Msg#msg_body_decoded{properties = P});
|
522 | 513 | msg_body_decoded([#'v1_0.application_properties'{content = APC} | Rem], Msg) ->
|
523 | 514 | msg_body_decoded(Rem, Msg#msg_body_decoded{application_properties = APC});
|
524 |
| -msg_body_decoded([_Ignore = #'v1_0.delivery_annotations'{} | Rem], Msg) -> |
525 |
| - msg_body_decoded(Rem, Msg); |
526 | 515 | msg_body_decoded([#'v1_0.data'{} = D | Rem], #msg_body_decoded{data = Body} = Msg)
|
527 | 516 | when is_list(Body) ->
|
528 | 517 | msg_body_decoded(Rem, Msg#msg_body_decoded{data = Body ++ [D]});
|
529 | 518 | msg_body_decoded([#'v1_0.amqp_sequence'{} = D | Rem], #msg_body_decoded{data = Body} = Msg)
|
530 | 519 | when is_list(Body) ->
|
531 | 520 | msg_body_decoded(Rem, Msg#msg_body_decoded{data = Body ++ [D]});
|
532 |
| -msg_body_decoded([#'v1_0.footer'{content = FC} | Rem], Msg) -> |
533 |
| - msg_body_decoded(Rem, Msg#msg_body_decoded{footer = FC}); |
534 | 521 | msg_body_decoded([#'v1_0.amqp_value'{} = B | Rem], #msg_body_decoded{} = Msg) ->
|
535 | 522 | %% an amqp value can only be a singleton
|
536 |
| - msg_body_decoded(Rem, Msg#msg_body_decoded{data = B}). |
537 |
| - |
538 |
| -msg_body_encoded(Sections, Payload) -> |
539 |
| - msg_body_encoded(Sections, Payload, #msg_body_encoded{}). |
| 523 | + msg_body_decoded(Rem, Msg#msg_body_decoded{data = B}); |
| 524 | +msg_body_decoded([#'v1_0.footer'{content = FC} | Rem], Msg) -> |
| 525 | + msg_body_decoded(Rem, Msg#msg_body_decoded{footer = FC}). |
540 | 526 |
|
541 | 527 | msg_body_encoded([#'v1_0.header'{} = H | Rem], Payload, Msg) ->
|
542 | 528 | msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{header = H});
|
|
0 commit comments