Skip to content

Commit 6887bb1

Browse files
committed
Reduce per message disk overhead
Message container annotation keys are stored on disk. By shortening them we save 95 - 58 = 37 bytes per message. ``` 1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})). 95 2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})). 58 ``` This should somewhat reduce disk I/O and disk space.
1 parent 838cb93 commit 6887bb1

25 files changed

+142
-150
lines changed

deps/rabbit/include/mc.hrl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,11 @@
2020
%% "Short strings can carry up to 255 octets of UTF-8 data, but
2121
%% may not contain binary zero octets." [AMQP 0.9.1 $4.2.5.3]
2222
-define(IS_SHORTSTR_LEN(B), byte_size(B) < 256).
23+
24+
%% We keep the following atom annotation keys short as they are stored per message on disk.
25+
-define(EXCHANGE, x).
26+
-define(ROUTING_KEYS, rk).
27+
-define(TIMESTAMP, ts).
28+
-define(RECEIVED_AT_TIMESTAMP, rts).
29+
-define(DURABLE, d).
30+
-define(PRIORITY, p).

deps/rabbit/src/mc.erl

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
set_ttl/2,
1919
x_header/2,
2020
routing_headers/2,
21+
exchange/1,
22+
routing_keys/1,
23+
exchange_and_routing_keys/1,
2124
%%
2225
convert/2,
2326
convert/3,
@@ -223,9 +226,29 @@ routing_headers(#?MODULE{protocol = Proto,
223226
routing_headers(BasicMsg, Opts) ->
224227
mc_compat:routing_headers(BasicMsg, Opts).
225228

229+
-spec exchange(state()) -> rabbit_misc:resource_name().
230+
exchange(#?MODULE{annotations = #{?EXCHANGE := Exchange}}) ->
231+
Exchange;
232+
exchange(BasicMessage) ->
233+
mc_compat:get_annotation(?EXCHANGE, BasicMessage).
234+
235+
-spec routing_keys(state()) -> [rabbit_types:routing_key(),...].
236+
routing_keys(#?MODULE{annotations = #{?ROUTING_KEYS := RoutingKeys}}) ->
237+
RoutingKeys;
238+
routing_keys(BasicMessage) ->
239+
mc_compat:get_annotation(?ROUTING_KEYS, BasicMessage).
240+
241+
-spec exchange_and_routing_keys(state()) ->
242+
{rabbit_misc:resource_name(), [rabbit_types:routing_key(),...]}.
243+
exchange_and_routing_keys(#?MODULE{annotations = #{?EXCHANGE := Exchange,
244+
?ROUTING_KEYS := RoutingKeys}}) ->
245+
{Exchange, RoutingKeys};
246+
exchange_and_routing_keys(BasicMessage) ->
247+
{exchange(BasicMessage), routing_keys(BasicMessage)}.
248+
226249
-spec is_persistent(state()) -> boolean().
227250
is_persistent(#?MODULE{annotations = Anns}) ->
228-
maps:get(durable, Anns, true);
251+
maps:get(?DURABLE, Anns, true);
229252
is_persistent(BasicMsg) ->
230253
mc_compat:is_persistent(BasicMsg).
231254

@@ -235,16 +258,15 @@ ttl(#?MODULE{annotations = Anns}) ->
235258
ttl(BasicMsg) ->
236259
mc_compat:ttl(BasicMsg).
237260

238-
239261
-spec timestamp(state()) -> undefined | non_neg_integer().
240262
timestamp(#?MODULE{annotations = Anns}) ->
241-
maps:get(timestamp, Anns, undefined);
263+
maps:get(?TIMESTAMP, Anns, undefined);
242264
timestamp(BasicMsg) ->
243265
mc_compat:timestamp(BasicMsg).
244266

245267
-spec priority(state()) -> undefined | non_neg_integer().
246268
priority(#?MODULE{annotations = Anns}) ->
247-
maps:get(priority, Anns, undefined);
269+
maps:get(?PRIORITY, Anns, undefined);
248270
priority(BasicMsg) ->
249271
mc_compat:priority(BasicMsg).
250272

@@ -327,8 +349,8 @@ record_death(Reason, SourceQueue,
327349
annotations = Anns0} = State)
328350
when is_atom(Reason) andalso is_binary(SourceQueue) ->
329351
Key = {SourceQueue, Reason},
330-
Exchange = maps:get(exchange, Anns0),
331-
RoutingKeys = maps:get(routing_keys, Anns0),
352+
#{?EXCHANGE := Exchange,
353+
?ROUTING_KEYS := RoutingKeys} = Anns0,
332354
Timestamp = os:system_time(millisecond),
333355
Ttl = maps:get(ttl, Anns0, undefined),
334356

@@ -427,7 +449,7 @@ is_cycle(Queue, [_ | Rem]) ->
427449

428450
set_received_at_timestamp(Anns) ->
429451
Millis = os:system_time(millisecond),
430-
maps:put(rts, Millis, Anns).
452+
Anns#{?RECEIVED_AT_TIMESTAMP => Millis}.
431453

432454
-ifdef(TEST).
433455
-include_lib("eunit/include/eunit.hrl").

deps/rabbit/src/mc_amqp.erl

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
x_header/2,
1111
property/2,
1212
routing_headers/2,
13-
get_property/2,
1413
convert_to/3,
1514
convert_from/3,
1615
protocol_state/2,
@@ -133,20 +132,6 @@ get_property(timestamp, Msg) ->
133132
_ ->
134133
undefined
135134
end;
136-
get_property(correlation_id, Msg) ->
137-
case Msg of
138-
#msg{properties = #'v1_0.properties'{correlation_id = {_Type, CorrId}}} ->
139-
CorrId;
140-
_ ->
141-
undefined
142-
end;
143-
get_property(message_id, Msg) ->
144-
case Msg of
145-
#msg{properties = #'v1_0.properties'{message_id = {_Type, CorrId}}} ->
146-
CorrId;
147-
_ ->
148-
undefined
149-
end;
150135
get_property(ttl, Msg) ->
151136
case Msg of
152137
#msg{header = #'v1_0.header'{ttl = {_, Ttl}}} ->
@@ -173,9 +158,7 @@ get_property(priority, Msg) ->
173158
_ ->
174159
undefined
175160
end
176-
end;
177-
get_property(_P, _Msg) ->
178-
undefined.
161+
end.
179162

180163
convert_to(?MODULE, Msg, _Env) ->
181164
Msg;
@@ -188,8 +171,8 @@ serialize(Sections) ->
188171
encode_bin(Sections).
189172

190173
protocol_state(Msg, Anns) ->
191-
Exchange = maps:get(exchange, Anns),
192-
[RKey | _] = maps:get(routing_keys, Anns),
174+
#{?EXCHANGE := Exchange,
175+
?ROUTING_KEYS := [RKey | _]} = Anns,
193176

194177
%% any x-* annotations get added as message annotations
195178
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),
@@ -426,11 +409,11 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
426409
undefined
427410
end,
428411
Anns = maps_put_falsy(
429-
durable, Durable,
412+
?DURABLE, Durable,
430413
maps_put_truthy(
431-
priority, Priority,
414+
?PRIORITY, Priority,
432415
maps_put_truthy(
433-
timestamp, Timestamp,
416+
?TIMESTAMP, Timestamp,
434417
maps_put_truthy(
435418
ttl, Ttl,
436419
maps_put_truthy(
@@ -443,20 +426,20 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
443426
lists:foldl(
444427
fun ({{symbol, <<"x-routing-key">>},
445428
{utf8, Key}}, Acc) ->
446-
maps:update_with(routing_keys,
429+
maps:update_with(?ROUTING_KEYS,
447430
fun(L) -> [Key | L] end,
448431
[Key],
449432
Acc);
450433
({{symbol, <<"x-cc">>},
451434
{list, CCs0}}, Acc) ->
452435
CCs = [CC || {_T, CC} <- CCs0],
453-
maps:update_with(routing_keys,
436+
maps:update_with(?ROUTING_KEYS,
454437
fun(L) -> L ++ CCs end,
455438
CCs,
456439
Acc);
457440
({{symbol, <<"x-exchange">>},
458441
{utf8, Exchange}}, Acc) ->
459-
Acc#{exchange => Exchange};
442+
Acc#{?EXCHANGE => Exchange};
460443
(_, Acc) ->
461444
Acc
462445
end, Anns, MA)

deps/rabbit/src/mc_amqpl.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
420420
end, Headers1)
421421
end,
422422
Timestamp = case Anns of
423-
#{timestamp := Ts} ->
423+
#{?TIMESTAMP := Ts} ->
424424
Ts div 1000;
425425
_ ->
426426
undefined
@@ -473,8 +473,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
473473
HeaderRoutes ->
474474
{ok, mc:init(?MODULE,
475475
rabbit_basic:strip_bcc_header(Content),
476-
Anns#{routing_keys => [RoutingKey | HeaderRoutes],
477-
exchange => ExchangeNameBin})}
476+
Anns#{?ROUTING_KEYS => [RoutingKey | HeaderRoutes],
477+
?EXCHANGE => ExchangeNameBin})}
478478
end;
479479
message(#resource{} = XName, RoutingKey,
480480
#content{} = Content, Anns, false) ->
@@ -707,13 +707,13 @@ essential_properties(#content{} = C) ->
707707
end,
708708
Durable = Mode == 2,
709709
maps_put_truthy(
710-
priority, Priority,
710+
?PRIORITY, Priority,
711711
maps_put_truthy(
712712
ttl, MsgTTL,
713713
maps_put_truthy(
714-
timestamp, Timestamp,
714+
?TIMESTAMP, Timestamp,
715715
maps_put_falsy(
716-
durable, Durable,
716+
?DURABLE, Durable,
717717
#{})))).
718718

719719
%% headers that are added as annotations during conversions

deps/rabbit/src/mc_compat.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ is(_) ->
4949
false.
5050

5151
-spec get_annotation(mc:ann_key(), state()) -> mc:ann_value() | undefined.
52-
get_annotation(routing_keys, #basic_message{routing_keys = RKeys}) ->
52+
get_annotation(?ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
5353
RKeys;
54-
get_annotation(exchange, #basic_message{exchange_name = Ex}) ->
54+
get_annotation(?EXCHANGE, #basic_message{exchange_name = Ex}) ->
5555
Ex#resource.name;
5656
get_annotation(id, #basic_message{id = Id}) ->
5757
Id.
5858

5959
set_annotation(id, Value, #basic_message{} = Msg) ->
6060
Msg#basic_message{id = Value};
61-
set_annotation(routing_keys, Value, #basic_message{} = Msg) ->
61+
set_annotation(?ROUTING_KEYS, Value, #basic_message{} = Msg) ->
6262
Msg#basic_message{routing_keys = Value};
63-
set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) ->
63+
set_annotation(?EXCHANGE, Value, #basic_message{exchange_name = Ex} = Msg) ->
6464
Msg#basic_message{exchange_name = Ex#resource{name = Value}};
6565
set_annotation(<<"x-", _/binary>> = Key, Value,
6666
#basic_message{content = Content0} = Msg) ->
@@ -88,7 +88,7 @@ set_annotation(<<"x-", _/binary>> = Key, Value,
8888
Msg#basic_message{content = C};
8989
set_annotation(<<"timestamp_in_ms">> = Name, Value, #basic_message{} = Msg) ->
9090
rabbit_basic:add_header(Name, long, Value, Msg);
91-
set_annotation(timestamp, Millis,
91+
set_annotation(?TIMESTAMP, Millis,
9292
#basic_message{content = #content{properties = B} = C0} = Msg) ->
9393
C = C0#content{properties = B#'P_basic'{timestamp = Millis div 1000},
9494
properties_bin = none},

deps/rabbit/src/rabbit_channel.erl

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -688,8 +688,7 @@ handle_cast({deliver_reply, Key, Msg},
688688
next_tag = DeliveryTag,
689689
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
690690
Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)),
691-
ExchName = mc:get_annotation(exchange, Msg),
692-
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
691+
{ExchName, [RoutingKey | _]} = mc:exchange_and_routing_keys(Msg),
693692
ok = rabbit_writer:send_command(
694693
WriterPid,
695694
#'basic.deliver'{consumer_tag = ConsumerTag,
@@ -2125,8 +2124,7 @@ notify_limiter(Limiter, Acked) ->
21252124

21262125
deliver_to_queues({Message, _Options, _RoutedToQueues} = Delivery,
21272126
#ch{cfg = #conf{virtual_host = VHost}} = State) ->
2128-
XNameBin = mc:get_annotation(exchange, Message),
2129-
XName = rabbit_misc:r(VHost, exchange, XNameBin),
2127+
XName = rabbit_misc:r(VHost, exchange, mc:exchange(Message)),
21302128
deliver_to_queues(XName, Delivery, State).
21312129

21322130
deliver_to_queues(XName,
@@ -2192,7 +2190,7 @@ process_routing_mandatory(_Mandatory = true,
21922190
false ->
21932191
Content0
21942192
end,
2195-
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
2193+
[RoutingKey | _] = mc:routing_keys(Msg),
21962194
ok = basic_return(Content, RoutingKey, XName#resource.name, State, no_route);
21972195
process_routing_mandatory(_Mandatory = false,
21982196
_RoutedToQs = [],
@@ -2673,14 +2671,13 @@ handle_deliver0(ConsumerTag, AckRequired,
26732671
writer_gc_threshold = GCThreshold},
26742672
next_tag = DeliveryTag,
26752673
queue_states = Qs}) ->
2676-
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
2677-
ExchangeNameBin = mc:get_annotation(exchange, MsgCont0),
2674+
{Exchange, [RoutingKey | _]} = mc:exchange_and_routing_keys(MsgCont0),
26782675
MsgCont = mc:convert(mc_amqpl, MsgCont0),
26792676
Content = mc:protocol_state(MsgCont),
26802677
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
26812678
delivery_tag = DeliveryTag,
26822679
redelivered = Redelivered,
2683-
exchange = ExchangeNameBin,
2680+
exchange = Exchange,
26842681
routing_key = RoutingKey},
26852682
{ok, QueueType} = rabbit_queue_type:module(QName, Qs),
26862683
case QueueType of
@@ -2699,15 +2696,14 @@ handle_deliver0(ConsumerTag, AckRequired,
26992696
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
27002697
Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0},
27012698
QueueType, State) ->
2702-
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
2703-
ExchangeName = mc:get_annotation(exchange, MsgCont0),
2699+
{Exchange, [RoutingKey | _]} = mc:exchange_and_routing_keys(MsgCont0),
27042700
MsgCont = mc:convert(mc_amqpl, MsgCont0),
27052701
Content = mc:protocol_state(MsgCont),
27062702
ok = rabbit_writer:send_command(
27072703
WriterPid,
27082704
#'basic.get_ok'{delivery_tag = DeliveryTag,
27092705
redelivered = Redelivered,
2710-
exchange = ExchangeName,
2706+
exchange = Exchange,
27112707
routing_key = RoutingKey,
27122708
message_count = MessageCount},
27132709
Content),

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
%%
77

88
-module(rabbit_dead_letter).
9+
-include("mc.hrl").
910

1011
-export([publish/5,
1112
detect_cycles/3]).
@@ -26,15 +27,15 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
2627
#resource{name = SourceQName}) ->
2728
DLRKeys = case RK of
2829
undefined ->
29-
mc:get_annotation(routing_keys, Msg0);
30+
mc:routing_keys(Msg0);
3031
_ ->
3132
[RK]
3233
end,
3334
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
3435
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
3536
Msg3 = mc:set_ttl(Ttl, Msg2),
36-
Msg4 = mc:set_annotation(routing_keys, DLRKeys, Msg3),
37-
DLMsg = mc:set_annotation(exchange, XName#resource.name, Msg4),
37+
Msg4 = mc:set_annotation(?ROUTING_KEYS, DLRKeys, Msg3),
38+
DLMsg = mc:set_annotation(?EXCHANGE, XName#resource.name, Msg4),
3839
Routed = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
3940
{QNames, Cycles} = detect_cycles(Reason, DLMsg, Routed),
4041
lists:foreach(fun log_cycle_once/1, Cycles),

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ route(Exchange, Message) ->
351351
route(#exchange{name = #resource{name = ?DEFAULT_EXCHANGE_NAME,
352352
virtual_host = VHost}},
353353
Message, _Opts) ->
354-
RKs0 = mc:get_annotation(routing_keys, Message),
354+
RKs0 = mc:routing_keys(Message),
355355
RKs = lists:usort(RKs0),
356356
[begin
357357
case virtual_reply_queue(RK) of

deps/rabbit/src/rabbit_exchange_type_direct.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ route(#exchange{name = Name, type = Type}, Msg) ->
3535
route(#exchange{name = Name, type = Type}, Msg, #{}).
3636

3737
route(#exchange{name = Name, type = Type}, Msg, _Opts) ->
38-
Routes = mc:get_annotation(routing_keys, Msg),
38+
Routes = mc:routing_keys(Msg),
3939
rabbit_db_binding:match_routing_key(Name, Routes, Type =:= direct).
4040

4141
validate(_X) -> ok.

deps/rabbit/src/rabbit_exchange_type_topic.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ route(Exchange, Msg) ->
4040
route(Exchange, Msg, #{}).
4141

4242
route(#exchange{name = XName}, Msg, Opts) ->
43-
RKeys = mc:get_annotation(routing_keys, Msg),
43+
RKeys = mc:routing_keys(Msg),
4444
lists:append([rabbit_db_topic_exchange:match(XName, RKey, Opts) || RKey <- RKeys]).
4545

4646
validate(_X) -> ok.

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,14 +317,14 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
317317
#resource{name = DLXName} = DLXRef,
318318
DLRKeys = case RKey of
319319
undefined ->
320-
mc:get_annotation(routing_keys, ConsumedMsg);
320+
mc:routing_keys(ConsumedMsg);
321321
_ ->
322322
[RKey]
323323
end,
324324
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg),
325325
Msg1 = mc:set_ttl(undefined, Msg0),
326-
Msg2 = mc:set_annotation(routing_keys, DLRKeys, Msg1),
327-
Msg = mc:set_annotation(exchange, DLXName, Msg2),
326+
Msg2 = mc:set_annotation(?ROUTING_KEYS, DLRKeys, Msg1),
327+
Msg = mc:set_annotation(?EXCHANGE, DLXName, Msg2),
328328
{TargetQs, State3} =
329329
case DLX of
330330
not_found ->
@@ -478,8 +478,8 @@ redeliver0(#pending{delivery = Msg0,
478478
when is_list(DLRKeys) ->
479479
#resource{name = DLXName} = DLXRef,
480480
Msg1 = mc:set_ttl(undefined, Msg0),
481-
Msg2 = mc:set_annotation(routing_keys, DLRKeys, Msg1),
482-
Msg = mc:set_annotation(exchange, DLXName, Msg2),
481+
Msg2 = mc:set_annotation(?ROUTING_KEYS, DLRKeys, Msg1),
482+
Msg = mc:set_annotation(?EXCHANGE, DLXName, Msg2),
483483
%% Because of implicit default bindings rabbit_exchange:route/2 can route to target
484484
%% queues that do not exist. Therefore, filter out non-existent target queues.
485485
RouteToQs0 = queue_names(

0 commit comments

Comments
 (0)