Skip to content

Commit de261a9

Browse files
committed
Cache exchange record
for default and pre-declared exchanges to save copying the #exchange{} record (i.e. save an ETS lookup call) on every received message. The default and pre-declared exchanges are protected from deletion and modification. Exchange routing decorators are not used in tier 1 plugins and in no open source tier 2 plugin.
1 parent 6910c6c commit de261a9

File tree

2 files changed

+50
-26
lines changed

2 files changed

+50
-26
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
}).
7777

7878
-record(incoming_link, {
79-
exchange :: rabbit_exchange:name(),
79+
exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
8080
routing_key :: undefined | rabbit_types:routing_key(),
8181
%% queue_name_bin is only set if the link target address refers to a queue.
8282
queue_name_bin :: undefined | rabbit_misc:resource_name(),
@@ -713,9 +713,9 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE,
713713
user = User}}) ->
714714
ok = validate_attach(Attach),
715715
case ensure_target(Target, Vhost, User) of
716-
{ok, XName, RoutingKey, QNameBin} ->
716+
{ok, Exchange, RoutingKey, QNameBin} ->
717717
IncomingLink = #incoming_link{
718-
exchange = XName,
718+
exchange = Exchange,
719719
routing_key = RoutingKey,
720720
queue_name_bin = QNameBin,
721721
delivery_count = DeliveryCountInt,
@@ -1529,7 +1529,7 @@ incoming_link_transfer(
15291529
rcv_settle_mode = RcvSettleMode,
15301530
handle = Handle = ?UINT(HandleInt)},
15311531
MsgPart,
1532-
#incoming_link{exchange = XName = #resource{name = XNameBin},
1532+
#incoming_link{exchange = Exchange,
15331533
routing_key = LinkRKey,
15341534
delivery_count = DeliveryCount0,
15351535
incoming_unconfirmed_map = U0,
@@ -1560,20 +1560,20 @@ incoming_link_transfer(
15601560
Sections = amqp10_framing:decode_bin(MsgBin),
15611561
?DEBUG("~s Inbound content:~n ~tp",
15621562
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
1563-
Anns0 = #{?ANN_EXCHANGE => XNameBin},
1564-
Anns = case LinkRKey of
1565-
undefined -> Anns0;
1566-
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
1567-
end,
1568-
Mc0 = mc:init(mc_amqp, Sections, Anns),
1569-
Mc1 = rabbit_message_interceptor:intercept(Mc0),
1570-
{Mc, RoutingKey} = ensure_routing_key(Mc1),
1571-
check_user_id(Mc, User),
1572-
messages_received(Settled),
1573-
case rabbit_exchange:lookup(XName) of
1574-
{ok, Exchange} ->
1575-
check_write_permitted_on_topic(Exchange, User, RoutingKey),
1576-
QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}),
1563+
case rabbit_exchange_lookup(Exchange) of
1564+
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
1565+
Anns0 = #{?ANN_EXCHANGE => XNameBin},
1566+
Anns = case LinkRKey of
1567+
undefined -> Anns0;
1568+
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
1569+
end,
1570+
Mc0 = mc:init(mc_amqp, Sections, Anns),
1571+
Mc1 = rabbit_message_interceptor:intercept(Mc0),
1572+
{Mc, RoutingKey} = ensure_routing_key(Mc1),
1573+
check_user_id(Mc, User),
1574+
messages_received(Settled),
1575+
check_write_permitted_on_topic(X, User, RoutingKey),
1576+
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
15771577
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
15781578
case not Settled andalso
15791579
RcvSettleMode =:= ?V_1_0_RECEIVER_SETTLE_MODE_SECOND of
@@ -1615,6 +1615,11 @@ incoming_link_transfer(
16151615
{error, [Disposition, Detach]}
16161616
end.
16171617

1618+
rabbit_exchange_lookup(X = #exchange{}) ->
1619+
{ok, X};
1620+
rabbit_exchange_lookup(XName = #resource{}) ->
1621+
rabbit_exchange:lookup(XName).
1622+
16181623
ensure_routing_key(Mc) ->
16191624
case mc:routing_keys(Mc) of
16201625
[RoutingKey] ->
@@ -1688,16 +1693,25 @@ ensure_target(#'v1_0.target'{address = Address,
16881693
{ok, Dest} ->
16891694
QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable),
16901695
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
1691-
XName = rabbit_misc:r(Vhost, exchange, list_to_binary(XNameList1)),
1696+
XNameBin = list_to_binary(XNameList1),
1697+
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
16921698
{ok, X} = rabbit_exchange:lookup(XName),
16931699
check_internal_exchange(X),
16941700
check_write_permitted(XName, User),
1701+
%% Pre-declared exchanges are protected against deletion and modification.
1702+
%% Let's cache the whole #exchange{} record to save a
1703+
%% rabbit_exchange:lookup(XName) call each time we receive a message.
1704+
Exchange = case XNameBin of
1705+
<<>> -> X;
1706+
<<"amq.", _/binary>> -> X;
1707+
_ -> XName
1708+
end,
16951709
RoutingKey = case RK of
16961710
undefined -> undefined;
16971711
[] -> undefined;
16981712
_ -> list_to_binary(RK)
16991713
end,
1700-
{ok, XName, RoutingKey, QNameBin};
1714+
{ok, Exchange, RoutingKey, QNameBin};
17011715
{error, _} = E ->
17021716
E
17031717
end;

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -925,35 +925,45 @@ server_closes_link(QType, Config) ->
925925

926926
server_closes_link_exchange(Config) ->
927927
XName = atom_to_binary(?FUNCTION_NAME),
928+
QName = <<"my queue">>,
929+
RoutingKey = <<"my routing key">>,
928930
Ch = rabbit_ct_client_helpers:open_channel(Config),
929931
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),
930-
932+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
933+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
934+
exchange = XName,
935+
routing_key = RoutingKey}),
931936
OpnConf = connection_config(Config),
932937
{ok, Connection} = amqp10_client:open_connection(OpnConf),
933938
{ok, Session} = amqp10_client:begin_session_sync(Connection),
934-
Address = <<"/exchange/", XName/binary, "/some-routing-key">>,
939+
Address = <<"/exchange/", XName/binary, "/", RoutingKey/binary>>,
935940
{ok, Sender} = amqp10_client:attach_sender_link(
936941
Session, <<"test-sender">>, Address),
937942
ok = wait_for_credit(Sender),
938943
?assertMatch(#{publishers := 1}, get_global_counters(Config)),
939944

945+
DTag1 = <<1>>,
946+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
947+
ok = wait_for_settlement(DTag1),
948+
940949
%% Server closes the link endpoint due to some AMQP 1.0 external condition:
941950
%% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange.
942951
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}),
943-
ok = rabbit_ct_client_helpers:close_channel(Ch),
944952

945953
%% When we publish the next message, we expect:
946954
%% 1. that the message is released because the exchange doesn't exist anymore, and
947-
DTag = <<255>>,
948-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<"body">>, false)),
949-
ok = wait_for_settlement(DTag, released),
955+
DTag2 = <<255>>,
956+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
957+
ok = wait_for_settlement(DTag2, released),
950958
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
951959
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
952960
receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok
953961
after 5000 -> ct:fail("server did not close our outgoing link")
954962
end,
955963
?assertMatch(#{publishers := 0}, get_global_counters(Config)),
956964

965+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
966+
ok = rabbit_ct_client_helpers:close_channel(Ch),
957967
ok = end_session_sync(Session),
958968
ok = amqp10_client:close_connection(Connection).
959969

0 commit comments

Comments
 (0)