Skip to content

Commit a8f076d

Browse files
ansdmichaelklishin
authored andcommitted
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead Message container annotation keys are stored on disk. By shortening them we save 95 - 58 = 37 bytes per message. ``` 1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})). 95 2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})). 58 ``` This should somewhat reduce disk I/O and disk space. * Ensure durable is a boolean Prevent key 'durable' with value 'undefined' being added to the mc annotations, for example when the durable field was not set, but another AMQP 1.0 header field was set. * Apply feedback
1 parent 3ac78a8 commit a8f076d

26 files changed

+140
-152
lines changed

deps/rabbit/include/mc.hrl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,11 @@
2020
%% "Short strings can carry up to 255 octets of UTF-8 data, but
2121
%% may not contain binary zero octets." [AMQP 0.9.1 $4.2.5.3]
2222
-define(IS_SHORTSTR_LEN(B), byte_size(B) < 256).
23+
24+
%% We keep the following atom annotation keys short as they are stored per message on disk.
25+
-define(ANN_EXCHANGE, x).
26+
-define(ANN_ROUTING_KEYS, rk).
27+
-define(ANN_TIMESTAMP, ts).
28+
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
29+
-define(ANN_DURABLE, d).
30+
-define(ANN_PRIORITY, p).

deps/rabbit/src/mc.erl

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
set_ttl/2,
1919
x_header/2,
2020
routing_headers/2,
21+
exchange/1,
22+
routing_keys/1,
2123
%%
2224
convert/2,
2325
convert/3,
@@ -223,9 +225,21 @@ routing_headers(#?MODULE{protocol = Proto,
223225
routing_headers(BasicMsg, Opts) ->
224226
mc_compat:routing_headers(BasicMsg, Opts).
225227

228+
-spec exchange(state()) -> undefined | rabbit_misc:resource_name().
229+
exchange(#?MODULE{annotations = Anns}) ->
230+
maps:get(?ANN_EXCHANGE, Anns, undefined);
231+
exchange(BasicMessage) ->
232+
mc_compat:get_annotation(?ANN_EXCHANGE, BasicMessage).
233+
234+
-spec routing_keys(state()) -> [rabbit_types:routing_key()].
235+
routing_keys(#?MODULE{annotations = Anns}) ->
236+
maps:get(?ANN_ROUTING_KEYS, Anns, []);
237+
routing_keys(BasicMessage) ->
238+
mc_compat:get_annotation(?ANN_ROUTING_KEYS, BasicMessage).
239+
226240
-spec is_persistent(state()) -> boolean().
227241
is_persistent(#?MODULE{annotations = Anns}) ->
228-
maps:get(durable, Anns, true);
242+
maps:get(?ANN_DURABLE, Anns, true);
229243
is_persistent(BasicMsg) ->
230244
mc_compat:is_persistent(BasicMsg).
231245

@@ -235,16 +249,15 @@ ttl(#?MODULE{annotations = Anns}) ->
235249
ttl(BasicMsg) ->
236250
mc_compat:ttl(BasicMsg).
237251

238-
239252
-spec timestamp(state()) -> undefined | non_neg_integer().
240253
timestamp(#?MODULE{annotations = Anns}) ->
241-
maps:get(timestamp, Anns, undefined);
254+
maps:get(?ANN_TIMESTAMP, Anns, undefined);
242255
timestamp(BasicMsg) ->
243256
mc_compat:timestamp(BasicMsg).
244257

245258
-spec priority(state()) -> undefined | non_neg_integer().
246259
priority(#?MODULE{annotations = Anns}) ->
247-
maps:get(priority, Anns, undefined);
260+
maps:get(?ANN_PRIORITY, Anns, undefined);
248261
priority(BasicMsg) ->
249262
mc_compat:priority(BasicMsg).
250263

@@ -327,8 +340,8 @@ record_death(Reason, SourceQueue,
327340
annotations = Anns0} = State)
328341
when is_atom(Reason) andalso is_binary(SourceQueue) ->
329342
Key = {SourceQueue, Reason},
330-
Exchange = maps:get(exchange, Anns0),
331-
RoutingKeys = maps:get(routing_keys, Anns0),
343+
#{?ANN_EXCHANGE := Exchange,
344+
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
332345
Timestamp = os:system_time(millisecond),
333346
Ttl = maps:get(ttl, Anns0, undefined),
334347

@@ -427,7 +440,7 @@ is_cycle(Queue, [_ | Rem]) ->
427440

428441
set_received_at_timestamp(Anns) ->
429442
Millis = os:system_time(millisecond),
430-
maps:put(rts, Millis, Anns).
443+
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.
431444

432445
-ifdef(TEST).
433446
-include_lib("eunit/include/eunit.hrl").

deps/rabbit/src/mc_amqp.erl

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
x_header/2,
1111
property/2,
1212
routing_headers/2,
13-
get_property/2,
1413
convert_to/3,
1514
convert_from/3,
1615
protocol_state/2,
@@ -113,7 +112,7 @@ routing_headers(Msg, Opts) ->
113112
get_property(durable, Msg) ->
114113
case Msg of
115114
#msg{header = #'v1_0.header'{durable = Durable}}
116-
when is_atom(Durable) ->
115+
when is_boolean(Durable) ->
117116
Durable;
118117
#msg{header = #'v1_0.header'{durable = {boolean, Durable}}} ->
119118
Durable;
@@ -133,20 +132,6 @@ get_property(timestamp, Msg) ->
133132
_ ->
134133
undefined
135134
end;
136-
get_property(correlation_id, Msg) ->
137-
case Msg of
138-
#msg{properties = #'v1_0.properties'{correlation_id = {_Type, CorrId}}} ->
139-
CorrId;
140-
_ ->
141-
undefined
142-
end;
143-
get_property(message_id, Msg) ->
144-
case Msg of
145-
#msg{properties = #'v1_0.properties'{message_id = {_Type, CorrId}}} ->
146-
CorrId;
147-
_ ->
148-
undefined
149-
end;
150135
get_property(ttl, Msg) ->
151136
case Msg of
152137
#msg{header = #'v1_0.header'{ttl = {_, Ttl}}} ->
@@ -173,9 +158,7 @@ get_property(priority, Msg) ->
173158
_ ->
174159
undefined
175160
end
176-
end;
177-
get_property(_P, _Msg) ->
178-
undefined.
161+
end.
179162

180163
convert_to(?MODULE, Msg, _Env) ->
181164
Msg;
@@ -188,8 +171,8 @@ serialize(Sections) ->
188171
encode_bin(Sections).
189172

190173
protocol_state(Msg, Anns) ->
191-
Exchange = maps:get(exchange, Anns),
192-
[RKey | _] = maps:get(routing_keys, Anns),
174+
#{?ANN_EXCHANGE := Exchange,
175+
?ANN_ROUTING_KEYS := [RKey | _]} = Anns,
193176

194177
%% any x-* annotations get added as message annotations
195178
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),
@@ -426,11 +409,11 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
426409
undefined
427410
end,
428411
Anns = maps_put_falsy(
429-
durable, Durable,
412+
?ANN_DURABLE, Durable,
430413
maps_put_truthy(
431-
priority, Priority,
414+
?ANN_PRIORITY, Priority,
432415
maps_put_truthy(
433-
timestamp, Timestamp,
416+
?ANN_TIMESTAMP, Timestamp,
434417
maps_put_truthy(
435418
ttl, Ttl,
436419
maps_put_truthy(
@@ -443,20 +426,20 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
443426
lists:foldl(
444427
fun ({{symbol, <<"x-routing-key">>},
445428
{utf8, Key}}, Acc) ->
446-
maps:update_with(routing_keys,
429+
maps:update_with(?ANN_ROUTING_KEYS,
447430
fun(L) -> [Key | L] end,
448431
[Key],
449432
Acc);
450433
({{symbol, <<"x-cc">>},
451434
{list, CCs0}}, Acc) ->
452435
CCs = [CC || {_T, CC} <- CCs0],
453-
maps:update_with(routing_keys,
436+
maps:update_with(?ANN_ROUTING_KEYS,
454437
fun(L) -> L ++ CCs end,
455438
CCs,
456439
Acc);
457440
({{symbol, <<"x-exchange">>},
458441
{utf8, Exchange}}, Acc) ->
459-
Acc#{exchange => Exchange};
442+
Acc#{?ANN_EXCHANGE => Exchange};
460443
(_, Acc) ->
461444
Acc
462445
end, Anns, MA)

deps/rabbit/src/mc_amqpl.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
420420
end, Headers1)
421421
end,
422422
Timestamp = case Anns of
423-
#{timestamp := Ts} ->
423+
#{?ANN_TIMESTAMP := Ts} ->
424424
Ts div 1000;
425425
_ ->
426426
undefined
@@ -473,8 +473,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
473473
HeaderRoutes ->
474474
{ok, mc:init(?MODULE,
475475
rabbit_basic:strip_bcc_header(Content),
476-
Anns#{routing_keys => [RoutingKey | HeaderRoutes],
477-
exchange => ExchangeNameBin})}
476+
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
477+
?ANN_EXCHANGE => ExchangeNameBin})}
478478
end;
479479
message(#resource{} = XName, RoutingKey,
480480
#content{} = Content, Anns, false) ->
@@ -707,13 +707,13 @@ essential_properties(#content{} = C) ->
707707
end,
708708
Durable = Mode == 2,
709709
maps_put_truthy(
710-
priority, Priority,
710+
?ANN_PRIORITY, Priority,
711711
maps_put_truthy(
712712
ttl, MsgTTL,
713713
maps_put_truthy(
714-
timestamp, Timestamp,
714+
?ANN_TIMESTAMP, Timestamp,
715715
maps_put_falsy(
716-
durable, Durable,
716+
?ANN_DURABLE, Durable,
717717
#{})))).
718718

719719
%% headers that are added as annotations during conversions

deps/rabbit/src/mc_compat.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ is(_) ->
4949
false.
5050

5151
-spec get_annotation(mc:ann_key(), state()) -> mc:ann_value() | undefined.
52-
get_annotation(routing_keys, #basic_message{routing_keys = RKeys}) ->
52+
get_annotation(?ANN_ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
5353
RKeys;
54-
get_annotation(exchange, #basic_message{exchange_name = Ex}) ->
54+
get_annotation(?ANN_EXCHANGE, #basic_message{exchange_name = Ex}) ->
5555
Ex#resource.name;
5656
get_annotation(id, #basic_message{id = Id}) ->
5757
Id.
5858

5959
set_annotation(id, Value, #basic_message{} = Msg) ->
6060
Msg#basic_message{id = Value};
61-
set_annotation(routing_keys, Value, #basic_message{} = Msg) ->
61+
set_annotation(?ANN_ROUTING_KEYS, Value, #basic_message{} = Msg) ->
6262
Msg#basic_message{routing_keys = Value};
63-
set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) ->
63+
set_annotation(?ANN_EXCHANGE, Value, #basic_message{exchange_name = Ex} = Msg) ->
6464
Msg#basic_message{exchange_name = Ex#resource{name = Value}};
6565
set_annotation(<<"x-", _/binary>> = Key, Value,
6666
#basic_message{content = Content0} = Msg) ->
@@ -88,7 +88,7 @@ set_annotation(<<"x-", _/binary>> = Key, Value,
8888
Msg#basic_message{content = C};
8989
set_annotation(<<"timestamp_in_ms">> = Name, Value, #basic_message{} = Msg) ->
9090
rabbit_basic:add_header(Name, long, Value, Msg);
91-
set_annotation(timestamp, Millis,
91+
set_annotation(?ANN_TIMESTAMP, Millis,
9292
#basic_message{content = #content{properties = B} = C0} = Msg) ->
9393
C = C0#content{properties = B#'P_basic'{timestamp = Millis div 1000},
9494
properties_bin = none},

deps/rabbit/src/rabbit_channel.erl

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -688,8 +688,8 @@ handle_cast({deliver_reply, Key, Msg},
688688
next_tag = DeliveryTag,
689689
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
690690
Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)),
691-
ExchName = mc:get_annotation(exchange, Msg),
692-
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
691+
ExchName = mc:exchange(Msg),
692+
[RoutingKey | _] = mc:routing_keys(Msg),
693693
ok = rabbit_writer:send_command(
694694
WriterPid,
695695
#'basic.deliver'{consumer_tag = ConsumerTag,
@@ -2125,8 +2125,7 @@ notify_limiter(Limiter, Acked) ->
21252125

21262126
deliver_to_queues({Message, _Options, _RoutedToQueues} = Delivery,
21272127
#ch{cfg = #conf{virtual_host = VHost}} = State) ->
2128-
XNameBin = mc:get_annotation(exchange, Message),
2129-
XName = rabbit_misc:r(VHost, exchange, XNameBin),
2128+
XName = rabbit_misc:r(VHost, exchange, mc:exchange(Message)),
21302129
deliver_to_queues(XName, Delivery, State).
21312130

21322131
deliver_to_queues(XName,
@@ -2192,7 +2191,7 @@ process_routing_mandatory(_Mandatory = true,
21922191
false ->
21932192
Content0
21942193
end,
2195-
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
2194+
[RoutingKey | _] = mc:routing_keys(Msg),
21962195
ok = basic_return(Content, RoutingKey, XName#resource.name, State, no_route);
21972196
process_routing_mandatory(_Mandatory = false,
21982197
_RoutedToQs = [],
@@ -2673,14 +2672,14 @@ handle_deliver0(ConsumerTag, AckRequired,
26732672
writer_gc_threshold = GCThreshold},
26742673
next_tag = DeliveryTag,
26752674
queue_states = Qs}) ->
2676-
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
2677-
ExchangeNameBin = mc:get_annotation(exchange, MsgCont0),
2675+
Exchange = mc:exchange(MsgCont0),
2676+
[RoutingKey | _] = mc:routing_keys(MsgCont0),
26782677
MsgCont = mc:convert(mc_amqpl, MsgCont0),
26792678
Content = mc:protocol_state(MsgCont),
26802679
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
26812680
delivery_tag = DeliveryTag,
26822681
redelivered = Redelivered,
2683-
exchange = ExchangeNameBin,
2682+
exchange = Exchange,
26842683
routing_key = RoutingKey},
26852684
{ok, QueueType} = rabbit_queue_type:module(QName, Qs),
26862685
case QueueType of
@@ -2699,15 +2698,15 @@ handle_deliver0(ConsumerTag, AckRequired,
26992698
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
27002699
Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0},
27012700
QueueType, State) ->
2702-
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
2703-
ExchangeName = mc:get_annotation(exchange, MsgCont0),
2701+
Exchange = mc:exchange(MsgCont0),
2702+
[RoutingKey | _] = mc:routing_keys(MsgCont0),
27042703
MsgCont = mc:convert(mc_amqpl, MsgCont0),
27052704
Content = mc:protocol_state(MsgCont),
27062705
ok = rabbit_writer:send_command(
27072706
WriterPid,
27082707
#'basic.get_ok'{delivery_tag = DeliveryTag,
27092708
redelivered = Redelivered,
2710-
exchange = ExchangeName,
2709+
exchange = Exchange,
27112710
routing_key = RoutingKey,
27122711
message_count = MessageCount},
27132712
Content),

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
%%
77

88
-module(rabbit_dead_letter).
9+
-include("mc.hrl").
910

1011
-export([publish/5,
1112
detect_cycles/3]).
@@ -26,15 +27,15 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
2627
#resource{name = SourceQName}) ->
2728
DLRKeys = case RK of
2829
undefined ->
29-
mc:get_annotation(routing_keys, Msg0);
30+
mc:routing_keys(Msg0);
3031
_ ->
3132
[RK]
3233
end,
3334
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
3435
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
3536
Msg3 = mc:set_ttl(Ttl, Msg2),
36-
Msg4 = mc:set_annotation(routing_keys, DLRKeys, Msg3),
37-
DLMsg = mc:set_annotation(exchange, XName#resource.name, Msg4),
37+
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),
38+
DLMsg = mc:set_annotation(?ANN_EXCHANGE, XName#resource.name, Msg4),
3839
Routed = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
3940
{QNames, Cycles} = detect_cycles(Reason, DLMsg, Routed),
4041
lists:foreach(fun log_cycle_once/1, Cycles),

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ route(Exchange, Message) ->
351351
route(#exchange{name = #resource{name = ?DEFAULT_EXCHANGE_NAME,
352352
virtual_host = VHost}},
353353
Message, _Opts) ->
354-
RKs0 = mc:get_annotation(routing_keys, Message),
354+
RKs0 = mc:routing_keys(Message),
355355
RKs = lists:usort(RKs0),
356356
[begin
357357
case virtual_reply_queue(RK) of

deps/rabbit/src/rabbit_exchange_type_direct.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ route(#exchange{name = Name, type = Type}, Msg) ->
3535
route(#exchange{name = Name, type = Type}, Msg, #{}).
3636

3737
route(#exchange{name = Name, type = Type}, Msg, _Opts) ->
38-
Routes = mc:get_annotation(routing_keys, Msg),
38+
Routes = mc:routing_keys(Msg),
3939
rabbit_db_binding:match_routing_key(Name, Routes, Type =:= direct).
4040

4141
validate(_X) -> ok.

deps/rabbit/src/rabbit_exchange_type_topic.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ route(Exchange, Msg) ->
4040
route(Exchange, Msg, #{}).
4141

4242
route(#exchange{name = XName}, Msg, Opts) ->
43-
RKeys = mc:get_annotation(routing_keys, Msg),
43+
RKeys = mc:routing_keys(Msg),
4444
lists:append([rabbit_db_topic_exchange:match(XName, RKey, Opts) || RKey <- RKeys]).
4545

4646
validate(_X) -> ok.

0 commit comments

Comments
 (0)