Skip to content

Commit 25e94d3

Browse files
committed
Support AMQP 1.0 natively
ATTACH target TRANSFER from client to server ATTACH source Merge session files Merge files rabbit_amqp1_0_session_process and rabbit_amqp1_0_session. Handle deliver action DETACH link and END session Fix credit reply from quorum queue Make following tests green: ``` make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:reliable_send_receive_with_outcomes_classic make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:reliable_send_receive_with_outcomes_quorum ``` Settle with state released if unroutable Fixes the following test: ``` make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:publishing_to_non_existing_queue_should_settle_with_released ``` Handle drain Make follwing tests green: ``` make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:roundtrip_classic_queue_with_drain make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:roundtrip_quorum_queue_with_drain ``` Handle send_credit_reply action Make the following test green: ``` make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:roundtrip_stream_queue_with_drain ``` Fix test expectation Make the following test green: ``` make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:message_headers_conversion ``` With Native AMQP, the behaviour of ``` Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false) Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false) ``` https://github.com/rabbitmq/rabbitmq-server/tree/main/deps/rabbitmq_amqp1_0#configuration will break because we always convert according to the message container conversions. For example, 091 x-headers will go into message-annotations instead of application properties. Also, false won’t be respected since we always convert the headers with message containers. Either we decide to have this breaking change with Native AMQP or we need to respect the old behaviour of application parameters amqp1_0.convert_amqp091_headers_to_app_props and amqp1_0.convert_app_props_to_amqp091_headers when doing the message container conversions. Register connection Make the following test green: ``` make -C deps/rabbitmq_amqp1_0/ ct-proxy_protocol ``` Delete unused code Fix message annotation test expectation With message containers, the broker will include x-exchange and x-routing-key message annotations. Serialize footer makes the following test green: ``` make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:footer ``` Set first-acquirer in header Makes the following test green: ``` make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:redelivery ``` Fix queue confirmation crash Makes the following test green: ``` make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:routing ``` Add some authz checks to make the following tests green: ``` make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:access_failure make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:access_failure_not_allowed make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:access_failure_send ``` Delete unused code Adapt test expectation Makes the following test green: ``` make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:streams ``` Fix dialyzer warnings bazel run gazelle excluding any changes for deps/proper Handle undefined msg ID from queue Makes the following tests green: ``` make -C deps/amqp10_client/ ct-system ``` Classic queues send an 'undefined' message ID to the channel when no ack is required. No ack is required when the send settle mode is settled. In this case it should be perfectly valid to always send the same (empty binary) delivery-tag from server to client. mix format deps/rabbitmq_cli/lib/rabbitmqctl.ex using Mix 1.15.4 (compiled with Erlang/OTP 26) Convert gen_server2 to gen_server Fix crash when basic message is received Makes the following test green: ``` bazel test //deps/rabbitmq_amqp1_0:amqp10_client_SUITE-mixed ``` Remove #outgoing_link.default_outcome Rename unconfirmed to incoming_unsettled_map to better match the AMQP spec terminology. Fix cherry-pick build failure Add MQTT 5.0 <-> AMQP 1.0 assertions Simplify rabbit_channel by removing extra AMQP 1.0 logic for settling unroutable messages with released state. This commit reverts the workaround introduced by PR 8015. Remove rabbit_queue_collector rabbit_queue_collector is responsible for synchronously deleting exclusive queues. Since the AMQP 1.0 plugin never creates exclusive queues, rabbit_queue_collector doesn't need to be started in the first place. This will save 1 Erlang process per AMQP 1.0 connection. Use 1 writer process per AMQP 1.0 connection AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel. Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer Erlang process per AMQP 1.0 session. Advantage of single writer proc per session (prior to this commit): * High parallelism for serialising packets if multiple sessions within a connection write heavily at the same time. This commit uses a single writer process per AMQP 1.0 connection that is shared across all AMQP 1.0 sessions. Advantages of single writer proc per connection (this commit): * Lower memory usage with hundreds of thousands of AMQP 1.0 sessions * Less TCP and IP header overhead given that the single writer process can accumulate across all sessions bytes worth a MSS before flushing the socket. In other words, this commit decides that a reader / writer process pair per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows. Having a writer per session is too heavy. The final goal by previous commits and follow-up commits is to reduce the total number of Erlang processes to allow hundreds of thousands of AMQP clients to connect while keeping resource usage in RabbitMQ at a low level. We still ensure high thoughput by having separate reader, writer, and session processes. Remove one supervisory level Given that we now have 1 writer per AMQP 1.0 connection, this commit changes the supervisor hierarchy such that only 1 additional process (rabbit_amqp1_0_session) is created per AMQP 1.0 session. Fix dialyze and xref `bazel run gazelle` wrongly removes the dependency on amqp_client due to a bug in the gazelle plugin. For now, we add a directive. Transform rabbit_amqp1_0_writer into gen_server Why: Prior to this commit, when clicking on the AMQP 1.0 writer process in observer, the process crashed. Instead of handling all these debug messages of the sys module, it's much better to implement a gen_server. There is no advantage of using a special OTP process over gen_server for the AMQP 1.0 writer. gen_server also provides cleaner format status output. How: Message callbacks return a timeout of 0. After all messages in the inbox are processed, the timeout message is handled by flushing any pending bytes. Add test for multiple sessions on same connection given that a single writer is used across multiple sessions. Remove stats timer from writer AMQP 1.0 connections haven't emitted any stats previously. Since Native AMQP 1.0 is targeted for 4.0 where metrics delivery via the Management API is removed anyway, we remove the stats timer from the 1.0 writer in this commit. Add better test for CLI connections listing Display connection properties in Management UI Make rabbit_confirms more efficient use lists:foldl/3 instead of lists:foldr/3. The returned order of confirmed sequence numers is not important since rabbit_channel will sort them anyway. Avoid lists:any/2 by checking within preceeding lists:foldl/3 Fix flawed serial number arithmetic Batch confirms and rejections When there are contiguous queue confirmations in the session process mailbox, batch them. When the confirmations are sent to the publisher, a single DISPOSITION frame is sent for contiguously confirmed delivery IDs and for the special case where no confirmations are outstanding anymore. This approach should be good enough. However it's sub optimal in scenarios where contiguous delivery IDs that need confirmations are rare, for example: * There are multiple links in the session with different sender settlement modes and sender publishes across these links interleaved. * sender settlement mode is mixed and sender publishes interleaved settled and unsettled TRANSFERs.
1 parent ee7c6d9 commit 25e94d3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3034
-3004
lines changed

deps/amqp10_client/README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22

33
This is an [Erlang client for the AMQP 1.0](https://www.amqp.org/resources/specifications) protocol.
44

5-
It's primary purpose is to be used in RabbitMQ related projects but it is a
6-
generic client that was tested with at least 4 implementations of AMQP 1.0.
5+
Its primary purpose is to be used in RabbitMQ related projects but it is a
6+
generic client that was tested with at least 3 implementations of AMQP 1.0.
77

88
If you are looking for an Erlang client for [AMQP 0-9-1](https://www.rabbitmq.com/tutorials/amqp-concepts.html) — a completely different
9-
protocol despite the name — [consider this one](https://github.com/rabbitmq/rabbitmq-erlang-client).
9+
protocol despite the name — [consider this one](../amqp_client).
1010

1111
## Project Maturity and Status
1212

1313
This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100%
14-
feature complete but moderately mature and was tested against at least three AMQP 1.0 servers:
14+
feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers:
1515
RabbitMQ, Azure ServiceBus, ActiveMQ.
1616

1717
This client library is not officially supported by VMware at this time.
@@ -80,8 +80,8 @@ after 2000 ->
8080
exit(credited_timeout)
8181
end.
8282
83-
%% create a new message using a delivery-tag, body and indicate
84-
%% it's settlement status (true meaning no disposition confirmation
83+
%% Create a new message using a delivery-tag, body and indicate
84+
%% its settlement status (true meaning no disposition confirmation
8585
%% will be sent by the receiver).
8686
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
8787
ok = amqp10_client:send_msg(Sender, OutMsg),
@@ -112,7 +112,7 @@ after the `Open` frame has been successfully written to the socket rather than
112112
waiting until the remote end returns with their `Open` frame. The client will
113113
notify the caller of various internal/async events using `amqp10_event`
114114
messages. In the example above when the remote replies with their `Open` frame
115-
a message is sent of the following forma:
115+
a message is sent of the following form:
116116

117117
```
118118
{amqp10_event, {connection, ConnectionPid, opened}}

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,6 @@ echo(#link_ref{role = receiver, session = Session,
334334
%%% messages
335335

336336
%% @doc Send a message on a the link referred to be the 'LinkRef'.
337-
%% Returns ok for "async" transfers when messages are sent with settled=true
338-
%% else it returns the delivery state from the disposition
339337
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
340338
ok | {error, insufficient_credit | link_not_found | half_attached}.
341339
send_msg(#link_ref{role = sender, session = Session,

deps/amqp10_client/src/amqp10_client_connection.erl

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,7 @@
7070
% set to a negative value to allow a sender to "overshoot" the flow
7171
% control by this margin
7272
transfer_limit_margin => 0 | neg_integer(),
73-
sasl => none | anon | {plain, User :: binary(), Pwd :: binary()},
74-
notify => pid(),
75-
notify_when_opened => pid() | none,
76-
notify_when_closed => pid() | none
73+
sasl => none | anon | {plain, User :: binary(), Pwd :: binary()}
7774
}.
7875

7976
-record(state,
@@ -144,7 +141,7 @@ protocol_header_received(Pid, Protocol, Maj, Min, Rev) ->
144141

145142
-spec begin_session(pid()) -> supervisor:startchild_ret().
146143
begin_session(Pid) ->
147-
gen_statem:call(Pid, begin_session, {dirty_timeout, ?TIMEOUT}).
144+
gen_statem:call(Pid, begin_session, ?TIMEOUT).
148145

149146
heartbeat(Pid) ->
150147
gen_statem:cast(Pid, heartbeat).

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@
142142
notify :: pid()
143143
}).
144144

145-
146145
%% -------------------------------------------------------------------
147146
%% Public API.
148147
%% -------------------------------------------------------------------
@@ -173,18 +172,17 @@ begin_sync(Connection, Timeout) ->
173172

174173
-spec attach(pid(), attach_args()) -> {ok, link_ref()}.
175174
attach(Session, Args) ->
176-
gen_statem:call(Session, {attach, Args}, {dirty_timeout, ?TIMEOUT}).
175+
gen_statem:call(Session, {attach, Args}, ?TIMEOUT).
177176

178177
-spec detach(pid(), link_handle()) -> ok | {error, link_not_found | half_attached}.
179178
detach(Session, Handle) ->
180-
gen_statem:call(Session, {detach, Handle}, {dirty_timeout, ?TIMEOUT}).
179+
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).
181180

182181
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
183182
ok | {error, insufficient_credit | link_not_found | half_attached}.
184183
transfer(Session, Amqp10Msg, Timeout) ->
185184
[Transfer | Records] = amqp10_msg:to_amqp_records(Amqp10Msg),
186-
gen_statem:call(Session, {transfer, Transfer, Records},
187-
{dirty_timeout, Timeout}).
185+
gen_statem:call(Session, {transfer, Transfer, Records}, Timeout).
188186

189187
flow(Session, Handle, Flow, RenewAfter) ->
190188
gen_statem:cast(Session, {flow, Handle, Flow, RenewAfter}).
@@ -193,7 +191,7 @@ flow(Session, Handle, Flow, RenewAfter) ->
193191
amqp10_client_types:delivery_state()) -> ok.
194192
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
195193
gen_statem:call(Session, {disposition, Role, First, Last, Settled,
196-
DeliveryState}, {dirty_timeout, ?TIMEOUT}).
194+
DeliveryState}, ?TIMEOUT).
197195

198196

199197

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,10 @@
1414

1515
-include("src/amqp10_client.hrl").
1616

17-
-compile(export_all).
17+
-compile([export_all, nowarn_export_all]).
1818

19-
-define(UNAUTHORIZED_USER, <<"test_user_no_perm">>).
20-
21-
%% The latch constant defines how many processes are spawned in order
22-
%% to run certain functionality in parallel. It follows the standard
23-
%% countdown latch pattern.
24-
-define(LATCH, 100).
25-
26-
%% The wait constant defines how long a consumer waits before it
27-
%% unsubscribes
28-
-define(WAIT, 200).
29-
30-
%% How to long wait for a process to die after an expected failure
31-
-define(PROCESS_EXIT_TIMEOUT, 5000).
19+
suite() ->
20+
[{timetrap, {seconds, 120}}].
3221

3322
all() ->
3423
[
@@ -344,7 +333,7 @@ roundtrip(OpenConf, Body) ->
344333
<<"test1">>,
345334
settled,
346335
unsettled_state),
347-
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60000 * 5),
336+
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60_000 * 4),
348337
ok = amqp10_client:end_session(Session),
349338
ok = amqp10_client:close_connection(Connection),
350339
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
@@ -379,7 +368,7 @@ filtered_roundtrip(OpenConf, Body) ->
379368
settled,
380369
unsettled_state),
381370
ok = amqp10_client:send_msg(Sender, Msg1),
382-
{ok, OutMsg1} = amqp10_client:get_msg(DefaultReceiver, 60000 * 5),
371+
{ok, OutMsg1} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4),
383372
?assertEqual(<<"msg-1-tag">>, amqp10_msg:delivery_tag(OutMsg1)),
384373

385374
timer:sleep(5 * 1000),
@@ -398,10 +387,10 @@ filtered_roundtrip(OpenConf, Body) ->
398387
unsettled_state,
399388
#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}),
400389

401-
{ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60000 * 5),
390+
{ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4),
402391
?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsg2)),
403392

404-
{ok, OutMsgFiltered} = amqp10_client:get_msg(FilteredReceiver, 60000 * 5),
393+
{ok, OutMsgFiltered} = amqp10_client:get_msg(FilteredReceiver, 60_000 * 4),
405394
?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsgFiltered)),
406395

407396
ok = amqp10_client:end_session(Session),
@@ -676,11 +665,13 @@ incoming_heartbeat(Config) ->
676665
idle_time_out => 1000, notify => self()},
677666
{ok, Connection} = amqp10_client:open_connection(CConf),
678667
receive
679-
{amqp10_event, {connection, Connection,
680-
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} ->
668+
{amqp10_event,
669+
{connection, Connection0,
670+
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}}
671+
when Connection0 =:= Connection ->
681672
ok
682673
after 5000 ->
683-
exit(incoming_heartbeat_assert)
674+
exit(incoming_heartbeat_assert)
684675
end,
685676
demonitor(MockRef).
686677

@@ -704,25 +695,30 @@ publish_messages(Sender, Data, Num) ->
704695

705696
receive_one(Receiver) ->
706697
receive
707-
{amqp10_msg, Receiver, Msg} ->
698+
{amqp10_msg, Receiver0, Msg}
699+
when Receiver0 =:= Receiver ->
708700
amqp10_client:accept_msg(Receiver, Msg)
709701
after 2000 ->
710702
timeout
711703
end.
712704

713705
await_disposition(DeliveryTag) ->
714706
receive
715-
{amqp10_disposition, {accepted, DeliveryTag}} -> ok
707+
{amqp10_disposition, {accepted, DeliveryTag0}}
708+
when DeliveryTag0 =:= DeliveryTag -> ok
716709
after 3000 ->
717710
flush(),
718711
exit(dispostion_timeout)
719712
end.
720713

721714
await_link(Who, What, Err) ->
722715
receive
723-
{amqp10_event, {link, Who, What}} ->
716+
{amqp10_event, {link, Who0, What0}}
717+
when Who0 =:= Who andalso
718+
What0 =:= What ->
724719
ok;
725-
{amqp10_event, {link, Who, {detached, Why}}} ->
720+
{amqp10_event, {link, Who0, {detached, Why}}}
721+
when Who0 =:= Who ->
726722
exit(Why)
727723
after 5000 ->
728724
flush(),

deps/rabbit/src/mc_amqp.erl

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,13 @@ get_property(priority, Msg) ->
174174
undefined
175175
end
176176
end;
177+
get_property(subject, Msg) ->
178+
case Msg of
179+
#msg{properties = #'v1_0.properties'{subject = {utf8, Subject}}} ->
180+
Subject;
181+
_ ->
182+
undefined
183+
end;
177184
get_property(_P, _Msg) ->
178185
undefined.
179186

@@ -185,10 +192,19 @@ convert_to(TargetProto, Msg) ->
185192
serialize(Sections) ->
186193
encode_bin(Sections).
187194

188-
protocol_state(Msg, Anns) ->
195+
protocol_state(Msg0 = #msg{header = Header0}, Anns) ->
196+
Redelivered = maps:get(redelivered, Anns, false),
197+
FirstAcquirer = not Redelivered,
198+
Header = case Header0 of
199+
undefined ->
200+
#'v1_0.header'{first_acquirer = FirstAcquirer};
201+
#'v1_0.header'{} ->
202+
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
203+
end,
204+
Msg = Msg0#msg{header = Header},
205+
189206
Exchange = maps:get(exchange, Anns),
190207
[RKey | _] = maps:get(routing_keys, Anns),
191-
192208
%% any x-* annotations get added as message annotations
193209
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),
194210

@@ -408,6 +424,10 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
408424
Priority = get_property(priority, Msg),
409425
Timestamp = get_property(timestamp, Msg),
410426
Ttl = get_property(ttl, Msg),
427+
RoutingKeys = case get_property(subject, Msg) of
428+
undefined -> undefined;
429+
Subject -> [Subject]
430+
end,
411431

412432
Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
413433
{list, DeathMaps} ->
@@ -432,8 +452,10 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
432452
maps_put_truthy(
433453
ttl, Ttl,
434454
maps_put_truthy(
435-
deaths, Deaths,
436-
#{}))))),
455+
routing_keys, RoutingKeys,
456+
maps_put_truthy(
457+
deaths, Deaths,
458+
#{})))))),
437459
case MA of
438460
[] ->
439461
Anns;

deps/rabbit/src/mc_amqpl.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
message/3,
2626
message/4,
2727
message/5,
28-
from_basic_message/1
28+
from_basic_message/1,
29+
to_091/2
2930
]).
3031

3132
-import(rabbit_misc,

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
-define(IS_QUORUM(QPid), is_tuple(QPid)).
9191
%%----------------------------------------------------------------------------
9292

93-
-export_type([name/0, qmsg/0, absent_reason/0]).
93+
-export_type([name/0, qmsg/0, msg_id/0, absent_reason/0]).
9494

9595
-type name() :: rabbit_types:r('queue').
9696

@@ -99,7 +99,7 @@
9999
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
100100
-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(),
101101
boolean(), mc:state()}.
102-
-type msg_id() :: non_neg_integer().
102+
-type msg_id() :: rabbit_types:option(non_neg_integer()).
103103
-type ok_or_errors() ::
104104
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
105105
-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.

0 commit comments

Comments
 (0)