Skip to content

Commit 7af5dc2

Browse files
committed
Apply feedback
1 parent 8a49ce5 commit 7af5dc2

15 files changed

+79
-83
lines changed

deps/rabbit/include/mc.hrl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
-define(IS_SHORTSTR_LEN(B), byte_size(B) < 256).
2323

2424
%% We keep the following atom annotation keys short as they are stored per message on disk.
25-
-define(EXCHANGE, x).
26-
-define(ROUTING_KEYS, rk).
27-
-define(TIMESTAMP, ts).
28-
-define(RECEIVED_AT_TIMESTAMP, rts).
29-
-define(DURABLE, d).
30-
-define(PRIORITY, p).
25+
-define(ANN_EXCHANGE, x).
26+
-define(ANN_ROUTING_KEYS, rk).
27+
-define(ANN_TIMESTAMP, ts).
28+
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
29+
-define(ANN_DURABLE, d).
30+
-define(ANN_PRIORITY, p).

deps/rabbit/src/mc.erl

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
routing_headers/2,
2121
exchange/1,
2222
routing_keys/1,
23-
exchange_and_routing_keys/1,
2423
%%
2524
convert/2,
2625
convert/3,
@@ -226,29 +225,21 @@ routing_headers(#?MODULE{protocol = Proto,
226225
routing_headers(BasicMsg, Opts) ->
227226
mc_compat:routing_headers(BasicMsg, Opts).
228227

229-
-spec exchange(state()) -> rabbit_misc:resource_name().
230-
exchange(#?MODULE{annotations = #{?EXCHANGE := Exchange}}) ->
231-
Exchange;
228+
-spec exchange(state()) -> undefined | rabbit_misc:resource_name().
229+
exchange(#?MODULE{annotations = Anns}) ->
230+
maps:get(?ANN_EXCHANGE, Anns, undefined);
232231
exchange(BasicMessage) ->
233-
mc_compat:get_annotation(?EXCHANGE, BasicMessage).
232+
mc_compat:get_annotation(?ANN_EXCHANGE, BasicMessage).
234233

235-
-spec routing_keys(state()) -> [rabbit_types:routing_key(),...].
236-
routing_keys(#?MODULE{annotations = #{?ROUTING_KEYS := RoutingKeys}}) ->
237-
RoutingKeys;
234+
-spec routing_keys(state()) -> [rabbit_types:routing_key()].
235+
routing_keys(#?MODULE{annotations = Anns}) ->
236+
maps:get(?ANN_ROUTING_KEYS, Anns, []);
238237
routing_keys(BasicMessage) ->
239-
mc_compat:get_annotation(?ROUTING_KEYS, BasicMessage).
240-
241-
-spec exchange_and_routing_keys(state()) ->
242-
{rabbit_misc:resource_name(), [rabbit_types:routing_key(),...]}.
243-
exchange_and_routing_keys(#?MODULE{annotations = #{?EXCHANGE := Exchange,
244-
?ROUTING_KEYS := RoutingKeys}}) ->
245-
{Exchange, RoutingKeys};
246-
exchange_and_routing_keys(BasicMessage) ->
247-
{exchange(BasicMessage), routing_keys(BasicMessage)}.
238+
mc_compat:get_annotation(?ANN_ROUTING_KEYS, BasicMessage).
248239

249240
-spec is_persistent(state()) -> boolean().
250241
is_persistent(#?MODULE{annotations = Anns}) ->
251-
maps:get(?DURABLE, Anns, true);
242+
maps:get(?ANN_DURABLE, Anns, true);
252243
is_persistent(BasicMsg) ->
253244
mc_compat:is_persistent(BasicMsg).
254245

@@ -260,13 +251,13 @@ ttl(BasicMsg) ->
260251

261252
-spec timestamp(state()) -> undefined | non_neg_integer().
262253
timestamp(#?MODULE{annotations = Anns}) ->
263-
maps:get(?TIMESTAMP, Anns, undefined);
254+
maps:get(?ANN_TIMESTAMP, Anns, undefined);
264255
timestamp(BasicMsg) ->
265256
mc_compat:timestamp(BasicMsg).
266257

267258
-spec priority(state()) -> undefined | non_neg_integer().
268259
priority(#?MODULE{annotations = Anns}) ->
269-
maps:get(?PRIORITY, Anns, undefined);
260+
maps:get(?ANN_PRIORITY, Anns, undefined);
270261
priority(BasicMsg) ->
271262
mc_compat:priority(BasicMsg).
272263

@@ -349,8 +340,8 @@ record_death(Reason, SourceQueue,
349340
annotations = Anns0} = State)
350341
when is_atom(Reason) andalso is_binary(SourceQueue) ->
351342
Key = {SourceQueue, Reason},
352-
#{?EXCHANGE := Exchange,
353-
?ROUTING_KEYS := RoutingKeys} = Anns0,
343+
#{?ANN_EXCHANGE := Exchange,
344+
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
354345
Timestamp = os:system_time(millisecond),
355346
Ttl = maps:get(ttl, Anns0, undefined),
356347

@@ -449,7 +440,7 @@ is_cycle(Queue, [_ | Rem]) ->
449440

450441
set_received_at_timestamp(Anns) ->
451442
Millis = os:system_time(millisecond),
452-
Anns#{?RECEIVED_AT_TIMESTAMP => Millis}.
443+
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.
453444

454445
-ifdef(TEST).
455446
-include_lib("eunit/include/eunit.hrl").

deps/rabbit/src/mc_amqp.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ serialize(Sections) ->
171171
encode_bin(Sections).
172172

173173
protocol_state(Msg, Anns) ->
174-
#{?EXCHANGE := Exchange,
175-
?ROUTING_KEYS := [RKey | _]} = Anns,
174+
#{?ANN_EXCHANGE := Exchange,
175+
?ANN_ROUTING_KEYS := [RKey | _]} = Anns,
176176

177177
%% any x-* annotations get added as message annotations
178178
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),
@@ -409,11 +409,11 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
409409
undefined
410410
end,
411411
Anns = maps_put_falsy(
412-
?DURABLE, Durable,
412+
?ANN_DURABLE, Durable,
413413
maps_put_truthy(
414-
?PRIORITY, Priority,
414+
?ANN_PRIORITY, Priority,
415415
maps_put_truthy(
416-
?TIMESTAMP, Timestamp,
416+
?ANN_TIMESTAMP, Timestamp,
417417
maps_put_truthy(
418418
ttl, Ttl,
419419
maps_put_truthy(
@@ -426,20 +426,20 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
426426
lists:foldl(
427427
fun ({{symbol, <<"x-routing-key">>},
428428
{utf8, Key}}, Acc) ->
429-
maps:update_with(?ROUTING_KEYS,
429+
maps:update_with(?ANN_ROUTING_KEYS,
430430
fun(L) -> [Key | L] end,
431431
[Key],
432432
Acc);
433433
({{symbol, <<"x-cc">>},
434434
{list, CCs0}}, Acc) ->
435435
CCs = [CC || {_T, CC} <- CCs0],
436-
maps:update_with(?ROUTING_KEYS,
436+
maps:update_with(?ANN_ROUTING_KEYS,
437437
fun(L) -> L ++ CCs end,
438438
CCs,
439439
Acc);
440440
({{symbol, <<"x-exchange">>},
441441
{utf8, Exchange}}, Acc) ->
442-
Acc#{?EXCHANGE => Exchange};
442+
Acc#{?ANN_EXCHANGE => Exchange};
443443
(_, Acc) ->
444444
Acc
445445
end, Anns, MA)

deps/rabbit/src/mc_amqpl.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
420420
end, Headers1)
421421
end,
422422
Timestamp = case Anns of
423-
#{?TIMESTAMP := Ts} ->
423+
#{?ANN_TIMESTAMP := Ts} ->
424424
Ts div 1000;
425425
_ ->
426426
undefined
@@ -473,8 +473,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
473473
HeaderRoutes ->
474474
{ok, mc:init(?MODULE,
475475
rabbit_basic:strip_bcc_header(Content),
476-
Anns#{?ROUTING_KEYS => [RoutingKey | HeaderRoutes],
477-
?EXCHANGE => ExchangeNameBin})}
476+
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
477+
?ANN_EXCHANGE => ExchangeNameBin})}
478478
end;
479479
message(#resource{} = XName, RoutingKey,
480480
#content{} = Content, Anns, false) ->
@@ -707,13 +707,13 @@ essential_properties(#content{} = C) ->
707707
end,
708708
Durable = Mode == 2,
709709
maps_put_truthy(
710-
?PRIORITY, Priority,
710+
?ANN_PRIORITY, Priority,
711711
maps_put_truthy(
712712
ttl, MsgTTL,
713713
maps_put_truthy(
714-
?TIMESTAMP, Timestamp,
714+
?ANN_TIMESTAMP, Timestamp,
715715
maps_put_falsy(
716-
?DURABLE, Durable,
716+
?ANN_DURABLE, Durable,
717717
#{})))).
718718

719719
%% headers that are added as annotations during conversions

deps/rabbit/src/mc_compat.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ is(_) ->
4949
false.
5050

5151
-spec get_annotation(mc:ann_key(), state()) -> mc:ann_value() | undefined.
52-
get_annotation(?ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
52+
get_annotation(?ANN_ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
5353
RKeys;
54-
get_annotation(?EXCHANGE, #basic_message{exchange_name = Ex}) ->
54+
get_annotation(?ANN_EXCHANGE, #basic_message{exchange_name = Ex}) ->
5555
Ex#resource.name;
5656
get_annotation(id, #basic_message{id = Id}) ->
5757
Id.
5858

5959
set_annotation(id, Value, #basic_message{} = Msg) ->
6060
Msg#basic_message{id = Value};
61-
set_annotation(?ROUTING_KEYS, Value, #basic_message{} = Msg) ->
61+
set_annotation(?ANN_ROUTING_KEYS, Value, #basic_message{} = Msg) ->
6262
Msg#basic_message{routing_keys = Value};
63-
set_annotation(?EXCHANGE, Value, #basic_message{exchange_name = Ex} = Msg) ->
63+
set_annotation(?ANN_EXCHANGE, Value, #basic_message{exchange_name = Ex} = Msg) ->
6464
Msg#basic_message{exchange_name = Ex#resource{name = Value}};
6565
set_annotation(<<"x-", _/binary>> = Key, Value,
6666
#basic_message{content = Content0} = Msg) ->
@@ -88,7 +88,7 @@ set_annotation(<<"x-", _/binary>> = Key, Value,
8888
Msg#basic_message{content = C};
8989
set_annotation(<<"timestamp_in_ms">> = Name, Value, #basic_message{} = Msg) ->
9090
rabbit_basic:add_header(Name, long, Value, Msg);
91-
set_annotation(?TIMESTAMP, Millis,
91+
set_annotation(?ANN_TIMESTAMP, Millis,
9292
#basic_message{content = #content{properties = B} = C0} = Msg) ->
9393
C = C0#content{properties = B#'P_basic'{timestamp = Millis div 1000},
9494
properties_bin = none},

deps/rabbit/src/rabbit_channel.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,8 @@ handle_cast({deliver_reply, Key, Msg},
688688
next_tag = DeliveryTag,
689689
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
690690
Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)),
691-
{ExchName, [RoutingKey | _]} = mc:exchange_and_routing_keys(Msg),
691+
ExchName = mc:exchange(Msg),
692+
[RoutingKey | _] = mc:routing_keys(Msg),
692693
ok = rabbit_writer:send_command(
693694
WriterPid,
694695
#'basic.deliver'{consumer_tag = ConsumerTag,
@@ -2671,7 +2672,8 @@ handle_deliver0(ConsumerTag, AckRequired,
26712672
writer_gc_threshold = GCThreshold},
26722673
next_tag = DeliveryTag,
26732674
queue_states = Qs}) ->
2674-
{Exchange, [RoutingKey | _]} = mc:exchange_and_routing_keys(MsgCont0),
2675+
Exchange = mc:exchange(MsgCont0),
2676+
[RoutingKey | _] = mc:routing_keys(MsgCont0),
26752677
MsgCont = mc:convert(mc_amqpl, MsgCont0),
26762678
Content = mc:protocol_state(MsgCont),
26772679
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
@@ -2696,7 +2698,8 @@ handle_deliver0(ConsumerTag, AckRequired,
26962698
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
26972699
Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0},
26982700
QueueType, State) ->
2699-
{Exchange, [RoutingKey | _]} = mc:exchange_and_routing_keys(MsgCont0),
2701+
Exchange = mc:exchange(MsgCont0),
2702+
[RoutingKey | _] = mc:routing_keys(MsgCont0),
27002703
MsgCont = mc:convert(mc_amqpl, MsgCont0),
27012704
Content = mc:protocol_state(MsgCont),
27022705
ok = rabbit_writer:send_command(

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
3434
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
3535
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
3636
Msg3 = mc:set_ttl(Ttl, Msg2),
37-
Msg4 = mc:set_annotation(?ROUTING_KEYS, DLRKeys, Msg3),
38-
DLMsg = mc:set_annotation(?EXCHANGE, XName#resource.name, Msg4),
37+
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),
38+
DLMsg = mc:set_annotation(?ANN_EXCHANGE, XName#resource.name, Msg4),
3939
Routed = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
4040
{QNames, Cycles} = detect_cycles(Reason, DLMsg, Routed),
4141
lists:foreach(fun log_cycle_once/1, Cycles),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
323323
end,
324324
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg),
325325
Msg1 = mc:set_ttl(undefined, Msg0),
326-
Msg2 = mc:set_annotation(?ROUTING_KEYS, DLRKeys, Msg1),
327-
Msg = mc:set_annotation(?EXCHANGE, DLXName, Msg2),
326+
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
327+
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),
328328
{TargetQs, State3} =
329329
case DLX of
330330
not_found ->
@@ -478,8 +478,8 @@ redeliver0(#pending{delivery = Msg0,
478478
when is_list(DLRKeys) ->
479479
#resource{name = DLXName} = DLXRef,
480480
Msg1 = mc:set_ttl(undefined, Msg0),
481-
Msg2 = mc:set_annotation(?ROUTING_KEYS, DLRKeys, Msg1),
482-
Msg = mc:set_annotation(?EXCHANGE, DLXName, Msg2),
481+
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
482+
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),
483483
%% Because of implicit default bindings rabbit_exchange:route/2 can route to target
484484
%% queues that do not exist. Therefore, filter out non-existent target queues.
485485
RouteToQs0 = queue_names(

deps/rabbit/src/rabbit_message_interceptor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,5 @@ set_timestamp(Msg, Timestamp, Overwrite) ->
4646
{Ts, false} when is_integer(Ts) ->
4747
Msg;
4848
_ ->
49-
mc:set_annotation(?TIMESTAMP, Timestamp, Msg)
49+
mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg)
5050
end.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1647,7 +1647,8 @@ peek(Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_quorum(Q) ->
16471647
_ -> 0
16481648
end,
16491649
Msg = mc:set_annotation(<<"x-delivery-count">>, Count, Msg0),
1650-
{XName, RoutingKeys} = mc:exchange_and_routing_keys(Msg),
1650+
XName = mc:exchange(Msg),
1651+
RoutingKeys = mc:routing_keys(Msg),
16511652
AmqpLegacyMsg = mc:prepare(read, mc:convert(mc_amqpl, Msg)),
16521653
Content = mc:protocol_state(AmqpLegacyMsg),
16531654
{ok, rabbit_basic:peek_fmt_message(XName, RoutingKeys, Content)};

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,12 +1112,12 @@ binary_to_msg(#resource{kind = queue,
11121112
%% If exchange or routing_keys annotation isn't present the data most likely came
11131113
%% from the rabbitmq-stream plugin so we'll choose defaults that simulate use
11141114
%% of the direct exchange.
1115-
Mc = case mc:get_annotation(?EXCHANGE, Mc0) of
1116-
undefined -> mc:set_annotation(?EXCHANGE, <<>>, Mc0);
1115+
Mc = case mc:exchange(Mc0) of
1116+
undefined -> mc:set_annotation(?ANN_EXCHANGE, <<>>, Mc0);
11171117
_ -> Mc0
11181118
end,
1119-
case mc:get_annotation(?ROUTING_KEYS, Mc) of
1120-
undefined -> mc:set_annotation(?ROUTING_KEYS, [QName], Mc);
1119+
case mc:routing_keys(Mc) of
1120+
[] -> mc:set_annotation(?ANN_ROUTING_KEYS, [QName], Mc);
11211121
_ -> Mc
11221122
end.
11231123

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,7 @@ oldest_message_received_timestamp(Q3, RPA) ->
12281228
Timestamps =
12291229
[Timestamp
12301230
|| HeadMsg <- HeadMsgs,
1231-
Timestamp <- [mc:get_annotation(?RECEIVED_AT_TIMESTAMP, HeadMsg)],
1231+
Timestamp <- [mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, HeadMsg)],
12321232
Timestamp /= undefined
12331233
],
12341234

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,8 @@ amqpl_amqp_bin_amqpl(_Config) ->
294294
Sections = amqp10_framing:decode_bin(
295295
iolist_to_binary(amqp_serialize(Msg10Pre))),
296296
Msg10 = mc:init(mc_amqp, Sections, #{}),
297-
?assertEqual({<<"exch">>, [<<"apple">>]}, mc:exchange_and_routing_keys(Msg10)),
297+
?assertEqual(<<"exch">>, mc:exchange(Msg10)),
298+
?assertEqual([<<"apple">>], mc:routing_keys(Msg10)),
298299
?assertEqual(98, mc:priority(Msg10)),
299300
?assertEqual(true, mc:is_persistent(Msg10)),
300301
?assertEqual(99000, mc:timestamp(Msg10)),
@@ -708,5 +709,5 @@ amqp_map_get(K, Tuples) ->
708709
end.
709710

710711
annotations() ->
711-
#{?EXCHANGE => <<"exch">>,
712-
?ROUTING_KEYS => [<<"apple">>]}.
712+
#{?ANN_EXCHANGE => <<"exch">>,
713+
?ANN_ROUTING_KEYS => [<<"apple">>]}.

deps/rabbitmq_mqtt/src/mc_mqtt.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ init(Msg = #mqtt_msg{qos = Qos,
2727
when is_integer(Qos) ->
2828
Anns0 = case Qos > 0 of
2929
true ->
30-
#{?DURABLE => true};
30+
#{?ANN_DURABLE => true};
3131
false ->
3232
#{}
3333
end,
3434
Anns1 = case Props of
3535
#{'Message-Expiry-Interval' := Seconds} ->
3636
Anns0#{ttl => timer:seconds(Seconds),
37-
?TIMESTAMP => os:system_time(millisecond)};
37+
?ANN_TIMESTAMP => os:system_time(millisecond)};
3838
_ ->
3939
Anns0
4040
end,
@@ -431,7 +431,7 @@ protocol_state(Msg = #mqtt_msg{props = Props0,
431431
undefined ->
432432
Props2;
433433
Ttl ->
434-
case maps:get(?TIMESTAMP, Anns) of
434+
case maps:get(?ANN_TIMESTAMP, Anns) of
435435
undefined ->
436436
Props2;
437437
Timestamp ->
@@ -457,7 +457,7 @@ protocol_state(Msg = #mqtt_msg{props = Props0,
457457
end
458458
end
459459
end,
460-
[RoutingKey | _] = maps:get(?ROUTING_KEYS, Anns),
460+
[RoutingKey | _] = maps:get(?ANN_ROUTING_KEYS, Anns),
461461
Msg#mqtt_msg{topic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey),
462462
props = Props}.
463463

0 commit comments

Comments
 (0)