Skip to content

Commit c3676d4

Browse files
committed
Support AMQP 1.0 natively
## What Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0. By "native", we mean do not proxy via AMQP 0.9.1 anymore. ## Why Native AMQP 1.0 comes with the following major benefits: 1. Similar to Native MQTT, this commit provides better throughput, latency, scalability, and resource usage for AMQP 1.0. See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements. See further below for some benchmarks. 2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol, this commit allows implementing more AMQP 1.0 features in the future. Some features are already implemented in this commit (see next section). 3. Simpler, better understandable, and more maintainable code. Native AMQP 1.0 as implemented in this commit has the following major benefits compared to AMQP 0.9.1: 4. Memory and disk alarms will only stop accepting incoming TRANSFER frames. New connections can still be created to consume from RabbitMQ to empty queues. 5. Due to 4. no need anymore for separate connections for publishers and consumers as we currently recommended for AMQP 0.9.1. which potentially halves the number of physical TCP connections. 6. When a single connection sends to multiple target queues, a single slow target queue won't block the entire connection. Publisher can still send data quickly to all other target queues. 7. A publisher can request whether it wants publisher confirmation on a per-message basis. In AMQP 0.9.1 publisher confirms are configured per channel only. 8. Consumers can change their "prefetch count" dynamically which isn't possible in our AMQP 0.9.1 implementation. See #10174 9. AMQP 1.0 is an extensible protocol This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in RabbitMQ 3.x - most of which cannot be backported due to the complexity and limitations of the old 3.x implementation. This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0. ## Implementation details 1. Breaking change: With Native AMQP, the behaviour of ``` Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false) Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false) ``` will break because we always convert according to the message container conversions. For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties. Also, `false` won’t be respected since we always convert the headers with message containers. 2. Remove rabbit_queue_collector rabbit_queue_collector is responsible for synchronously deleting exclusive queues. Since the AMQP 1.0 plugin never creates exclusive queues, rabbit_queue_collector doesn't need to be started in the first place. This will save 1 Erlang process per AMQP 1.0 connection. 3. 7 processes per connection + 1 process per session in this commit instead of 7 processes per connection + 15 processes per session in 3.x Supervision hierarchy got re-designed. 4. Use 1 writer process per AMQP 1.0 connection AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel. Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session. Advantage of single writer proc per session (prior to this commit): * High parallelism for serialising packets if multiple sessions within a connection write heavily at the same time. This commit uses a single writer process per AMQP 1.0 connection that is shared across all AMQP 1.0 sessions. Advantages of single writer proc per connection (this commit): * Lower memory usage with hundreds of thousands of AMQP 1.0 sessions * Less TCP and IP header overhead given that the single writer process can accumulate across all sessions bytes before flushing the socket. In other words, this commit decides that a reader / writer process pair per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows. Having a writer per session is too heavy. We still ensure high throughput by having separate reader, writer, and session processes. 5. Transform rabbit_amqp1_0_writer into gen_server Why: Prior to this commit, when clicking on the AMQP 1.0 writer process in observer, the process crashed. Instead of handling all these debug messages of the sys module, it's better to implement a gen_server. There is no advantage of using a special OTP process over gen_server for the AMQP 1.0 writer. gen_server also provides cleaner format status output. How: Message callbacks return a timeout of 0. After all messages in the inbox are processed, the timeout message is handled by flushing any pending bytes. 6. Remove stats timer from writer AMQP 1.0 connections haven't emitted any stats previously. 7. When there are contiguous queue confirmations in the session process mailbox, batch them. When the confirmations are sent to the publisher, a single DISPOSITION frame is sent for contiguously confirmed delivery IDs. This approach should be good enough. However it's sub optimal in scenarios where contiguous delivery IDs that need confirmations are rare, for example: * There are multiple links in the session with different sender settlement modes and sender publishes across these links interleaved. * sender settlement mode is mixed and sender publishes interleaved settled and unsettled TRANSFERs. 8. Introduce credit API v2 Why: The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly designed since basic.credit is a synchronous call into the queue process blocking the entire AMQP 1.0 session process. How: Change the interactions between queue clients and queue server implementations: * Clients only request a credit reply if the FLOW's `echo` field is set * Include all link flow control state held by the queue process into a new credit_reply queue event: * `available` after the queue sends any deliveries * `link-credit` after the queue sends any deliveries * `drain` which allows us to combine the old queue events send_credit_reply and send_drained into a single new queue event credit_reply. * Include the consumer tag into the credit_reply queue event such that the AMQP 1.0 session process can process any credit replies asynchronously. Link flow control state `delivery-count` also moves to the queue processes. The new interactions are hidden behind feature flag credit_api_v2 to allow for rolling upgrades from 3.13 to 4.0. 9. Use serial number arithmetic in quorum queues and session process. 10. Completely bypass the rabbit_limiter module for AMQP 1.0 flow control. The goal is to eventually remove the rabbit_limiter module in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter into rabbit_queue_consumers. 11. Fix credit bug for streams: AMQP 1.0 settlements shouldn't top up link credit, only FLOW frames should top up link credit. 12. Allow sender settle mode unsettled for streams since AMQP 1.0 acknowledgements to streams are no-ops (currently). 13. Fix AMQP 1.0 client bugs Auto renewing credits should not be related to settling TRANSFERs. Remove field link_credit_unsettled as it was wrong and confusing. Prior to this commit auto renewal did not work when the sender uses sender settlement mode settled. 14. Fix AMQP 1.0 client bugs The wrong outdated Link was passed to function auto_flow/2 15. Use osiris chunk iterator Only hold messages of uncompressed sub batches in memory if consumer doesn't have sufficient credits. Compressed sub batches are skipped for non Stream protocol consumers. 16. Fix incoming link flow control Always use confirms between AMQP 1.0 queue clients and queue servers. As already done internally by rabbit_fifo_client and rabbit_stream_queue, use confirms for classic queues as well. 17. Include link handle into correlation when publishing messages to target queues such that session process can correlate confirms from target queues to incoming links. 18. Only grant more credits to publishers if publisher hasn't sufficient credits anymore and there are not too many unconfirmed messages on the link. 19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow between classic queue process and session process. 20. Link flow control is independent between links. A client can refer to a queue or to an exchange with multiple dynamically added target queues. Multiple incoming links can also fan in to the same queue. However the link topology looks like, this commit ensures that each link is only granted more credits if that link isn't overloaded. 21. A connection or a session can send to many different queues. In AMQP 0.9.1, a single slow queue will lead to the entire channel, and then entire connection being blocked. This commit makes sure that a single slow queue from one link won't slow down sending on other links. For example, having link A sending to a local classic queue and link B sending to 5 replica quorum queue, link B will naturally grant credits slower than link A. So, despite the quorum queue being slower in confirming messages, the same AMQP 1.0 connection and session can still pump data very fast into the classic queue. 22. If cluster wide memory or disk alarm occurs. Each session sends a FLOW with incoming-window to 0 to sending client. If sending clients don’t obey, force disconnect the client. If cluster wide memory alarm clears: Each session resumes with a FLOW defaulting to initial incoming-window. 23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms, specifically, attaching consumers and consuming, i.e. emptying queues. There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation. 24. Flow control summary: * If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control). * If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control. * If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied. Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path. 25. Register AMQP sessions Prefer local-only pg over our custom pg_local implementation as pg is a better process group implementation than pg_local. pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once. 26. Start a local-only pg when Rabbit boots: > A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name: > pg:start_link(node()). Register AMQP 1.0 connections and sessions with pg. In future we should remove pg_local and instead use the new local-only pg for all registered processes such as AMQP 0.9.1 connections and channels. 27. Requeue messages if link detached Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed' field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed, we expect every outstanding delivery to be requeued. In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued. Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1: "After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them." [https://www.rabbitmq.com/consumers.html#unsubscribing] An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries 28. Init AMQP session with BEGIN frame Similar to how there can't be an MQTT processor without a CONNECT frame, there can't be an AMQP session without a BEGIN frame. This allows having strict dialyzer types for session flow control fields (i.e. not allowing 'undefined'). 29. Move serial_number to AMQP 1.0 common lib such that it can be used by both AMQP 1.0 server and client 30. Fix AMQP client to do serial number arithmetic. 31. AMQP client: Differentiate between delivery-id and transfer-id for better understandability. 32. Fix link flow control in classic queues This commit fixes ``` java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0 ``` followed by ``` ./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2 ``` Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around 8 - 10,000 messages. The bug was that in flight messages from classic queue process to session process were not taken into account when topping up credit to the classic queue process. Fixes #2597 The solution to this bug (and a much cleaner design anyway independent of this bug) is that queues should hold all link flow control state including the delivery-count. Hence, when credit API v2 is used the delivery-count will be held by the classic queue process, quorum queue process, and stream queue client instead of managing the delivery-count in the session. 33. The double level crediting between (a) session process and rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was removed. Therefore, instead of managing 3 separate delivery-counts (i. session, ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used in rabbit_fifo. This is a big simplification. 34. This commit fixes quorum queues without bumping the machine version nor introducing new rabbit_fifo commands. Whether credit API v2 is used is solely determined at link attachment time depending on whether feature flag credit_api_v2 is enabled. Even when that feature flag will be enabled later on, this link will keep using credit API v1 until detached (or the node is shut down). Eventually, after feature flag credit_api_v2 has been enabled and a subsequent rolling upgrade, all links will use credit API v2. This approach is safe and simple. The 2 alternatives to move delivery-count from the session process to the queue processes would have been: i. Explicit feature flag credit_api_v2 migration function * Can use a gen_server:call and only finish migration once all delivery-counts were migrated. Cons: * Extra new message format just for migration is required. * Risky as migration will fail if a target queue doesn’t reply. ii. Session always includes DeliveryCountSnd when crediting to the queue: Cons: * 2 delivery counts will be hold simultaneously in session proc and queue proc; could be solved by deleting the session proc’s delivery-count for credit-reply * What happens if the receiver doesn’t provide credit for a very long time? Is that a problem? 35. Support stream filtering in AMQP 1.0 (by @acogoluegnes) Use the x-stream-filter-value message annotation to carry the filter value in a published message. Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered filters when creating a receiver that wants to filter out messages from a stream. 36. Remove credit extension from AMQP 0.9.1 client 37. Support maintenance mode closing AMQP 1.0 connections. 38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation. 39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default. The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment tools from failing that execute: ``` rabbitmq-plugins enable rabbitmq_amqp1_0 rabbitmq-plugins disable rabbitmq_amqp1_0 ``` 40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`. Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`: ``` rabbitmqctl list_connections protocol Listing connections ... protocol {1, 0} {0,9,1} ``` ## Benchmarks ### Throughput & Latency Setup: * Single node Ubuntu 22.04 * Erlang 26.1.1 Start RabbitMQ: ``` make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3" ``` Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1. Start client: https://github.com/ssorj/quiver https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64) ``` docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest bash-5.1# quiver --version quiver 0.4.0-SNAPSHOT ``` 1. Classic queue ``` quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000 ``` This commit: ``` Count ............................................. 1,000,000 messages Duration ............................................... 73.8 seconds Sender rate .......................................... 13,548 messages/s Receiver rate ........................................ 13,547 messages/s End-to-end rate ...................................... 13,547 messages/s Latencies by percentile: 0% ........ 0 ms 90.00% ........ 9 ms 25% ........ 2 ms 99.00% ....... 14 ms 50% ........ 4 ms 99.90% ....... 17 ms 100% ....... 26 ms 99.99% ....... 24 ms ``` RabbitMQ 3.x (main branch as of 30 January 2024): ``` ---------------------- Sender ----------------------- --------------------- Receiver ---------------------- -------- Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms] ----------------------------------------------------- ----------------------------------------------------- -------- 2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511 4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0 6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0 8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662 10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0 12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0 14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147 16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0 18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0 20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0 22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0 receiver timed out 24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0 quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1. Traceback (most recent call last): File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run _plano.wait(receiver, check=True) File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait raise PlanoProcessError(proc) plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1. ``` 2. Quorum queue: ``` quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000 ``` This commit: ``` Count ............................................. 1,000,000 messages Duration .............................................. 101.4 seconds Sender rate ........................................... 9,867 messages/s Receiver rate ......................................... 9,868 messages/s End-to-end rate ....................................... 9,865 messages/s Latencies by percentile: 0% ....... 11 ms 90.00% ....... 23 ms 25% ....... 15 ms 99.00% ....... 28 ms 50% ....... 18 ms 99.90% ....... 33 ms 100% ....... 49 ms 99.99% ....... 47 ms ``` RabbitMQ 3.x: ``` ---------------------- Sender ----------------------- --------------------- Receiver ---------------------- -------- Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms] ----------------------------------------------------- ----------------------------------------------------- -------- 2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221 4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168 6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0 8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0 10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0 12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0 receiver timed out 14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0 quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1. Traceback (most recent call last): File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run _plano.wait(receiver, check=True) File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait raise PlanoProcessError(proc) plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1. ``` 3. Stream: ``` quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose ``` This commit: ``` Count ............................................. 1,000,000 messages Duration ................................................ 8.7 seconds Message rate ........................................ 115,154 messages/s ``` RabbitMQ 3.x: ``` Count ............................................. 1,000,000 messages Duration ............................................... 21.2 seconds Message rate ......................................... 47,232 messages/s ``` ### Memory usage Start RabbitMQ: ``` ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf" ``` ``` /bin/cat rabbitmq.conf tcp_listen_options.sndbuf = 2048 tcp_listen_options.recbuf = 2048 vm_memory_high_watermark.relative = 0.95 vm_memory_high_watermark_paging_ratio = 0.95 loopback_users = none ``` Create 50k connections with 2 sessions per connection, i.e. 100k session in total: ```go package main import ( "context" "log" "time" "github.com/Azure/go-amqp" ) func main() { for i := 0; i < 50000; i++ { conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()}) if err != nil { log.Fatal("dialing AMQP server:", err) } _, err = conn.NewSession(context.TODO(), nil) if err != nil { log.Fatal("creating AMQP session:", err) } _, err = conn.NewSession(context.TODO(), nil) if err != nil { log.Fatal("creating AMQP session:", err) } } log.Println("opened all connections") time.Sleep(5 * time.Hour) } ``` This commit: ``` erlang:memory(). [{total,4586376480}, {processes,4025898504}, {processes_used,4025871040}, {system,560477976}, {atom,1048841}, {atom_used,1042841}, {binary,233228608}, {code,21449982}, {ets,108560464}] erlang:system_info(process_count). 450289 ``` 7 procs per connection + 1 proc per session. (7 + 2*1) * 50,000 = 450,000 procs RabbitMQ 3.x: ``` erlang:memory(). [{total,15168232704}, {processes,14044779256}, {processes_used,14044755120}, {system,1123453448}, {atom,1057033}, {atom_used,1052587}, {binary,236381264}, {code,21790238}, {ets,391423744}] erlang:system_info(process_count). 1850309 ``` 7 procs per connection + 15 per session (7 + 2*15) * 50,000 = 1,850,000 procs 50k connections + 100k session require with this commit: 4.5 GB in RabbitMQ 3.x: 15 GB ## Future work 1. More efficient parser and serializer 2. TODO in mc_amqp: Do not store the parsed message on disk. 3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP clients to create RabbitMQ objects (queues, exchanges, ...).
1 parent 01e4583 commit c3676d4

File tree

171 files changed

+12073
-7352
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

171 files changed

+12073
-7352
lines changed

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,6 @@ RSYNC_FLAGS += -a $(RSYNC_V) \
176176
--exclude '/cowboy/doc/' \
177177
--exclude '/cowboy/examples/' \
178178
--exclude '/rabbit/escript/' \
179-
--exclude '/rabbitmq_amqp1_0/test/swiftmq/build/'\
180-
--exclude '/rabbitmq_amqp1_0/test/swiftmq/swiftmq*'\
181179
--exclude '/rabbitmq_cli/escript/' \
182180
--exclude '/rabbitmq_mqtt/test/build/' \
183181
--exclude '/rabbitmq_mqtt/test/test_client/'\

deps/amqp10_client/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ dialyze(
100100
)
101101

102102
broker_for_integration_suites(
103-
extra_plugins = ["//deps/rabbitmq_amqp1_0:erlang_app"],
104103
)
105104

106105
TEST_DEPS = [

deps/amqp10_client/Makefile

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ PACKAGES_DIR ?= $(abspath PACKAGES)
3030

3131
BUILD_DEPS = rabbit_common elvis_mk
3232
DEPS = amqp10_common credentials_obfuscation
33-
TEST_DEPS = rabbit rabbitmq_amqp1_0 rabbitmq_ct_helpers
33+
TEST_DEPS = rabbit rabbitmq_ct_helpers
3434
LOCAL_DEPS = ssl inets crypto public_key
3535

3636
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-test.mk
@@ -51,20 +51,6 @@ include erlang.mk
5151
HEX_TARBALL_FILES += rabbitmq-components.mk \
5252
git-revisions.txt
5353

54-
# --------------------------------------------------------------------
55-
# Compiler flags.
56-
# --------------------------------------------------------------------
57-
58-
# gen_fsm is deprecated starting from Erlang 20, but we want to support
59-
# Erlang 19 as well.
60-
61-
ERTS_VER := $(shell erl -version 2>&1 | sed -E 's/.* version //')
62-
ERLANG_20_ERTS_VER := 9.0
63-
64-
ifeq ($(call compare_version,$(ERTS_VER),$(ERLANG_20_ERTS_VER),>=),true)
65-
ERLC_OPTS += -Dnowarn_deprecated_gen_fsm
66-
endif
67-
6854
# Dialyze the tests.
6955
DIALYZER_OPTS += --src -r test
7056

deps/amqp10_client/README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22

33
This is an [Erlang client for the AMQP 1.0](https://www.amqp.org/resources/specifications) protocol.
44

5-
It's primary purpose is to be used in RabbitMQ related projects but it is a
6-
generic client that was tested with at least 4 implementations of AMQP 1.0.
5+
Its primary purpose is to be used in RabbitMQ related projects but it is a
6+
generic client that was tested with at least 3 implementations of AMQP 1.0.
77

88
If you are looking for an Erlang client for [AMQP 0-9-1](https://www.rabbitmq.com/tutorials/amqp-concepts.html) — a completely different
9-
protocol despite the name — [consider this one](https://github.com/rabbitmq/rabbitmq-erlang-client).
9+
protocol despite the name — [consider this one](../amqp_client).
1010

1111
## Project Maturity and Status
1212

1313
This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100%
14-
feature complete but moderately mature and was tested against at least three AMQP 1.0 servers:
14+
feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers:
1515
RabbitMQ, Azure ServiceBus, ActiveMQ.
1616

1717
This client library is not officially supported by VMware at this time.
@@ -80,8 +80,8 @@ after 2000 ->
8080
exit(credited_timeout)
8181
end.
8282
83-
%% create a new message using a delivery-tag, body and indicate
84-
%% it's settlement status (true meaning no disposition confirmation
83+
%% Create a new message using a delivery-tag, body and indicate
84+
%% its settlement status (true meaning no disposition confirmation
8585
%% will be sent by the receiver).
8686
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
8787
ok = amqp10_client:send_msg(Sender, OutMsg),
@@ -112,7 +112,7 @@ after the `Open` frame has been successfully written to the socket rather than
112112
waiting until the remote end returns with their `Open` frame. The client will
113113
notify the caller of various internal/async events using `amqp10_event`
114114
messages. In the example above when the remote replies with their `Open` frame
115-
a message is sent of the following forma:
115+
a message is sent of the following form:
116116

117117
```
118118
{amqp10_event, {connection, ConnectionPid, opened}}

deps/amqp10_client/app.bzl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ def all_beam_files(name = "all_beam_files"):
1313
"src/amqp10_client_app.erl",
1414
"src/amqp10_client_connection.erl",
1515
"src/amqp10_client_connection_sup.erl",
16-
"src/amqp10_client_connections_sup.erl",
1716
"src/amqp10_client_frame_reader.erl",
1817
"src/amqp10_client_session.erl",
1918
"src/amqp10_client_sessions_sup.erl",
@@ -42,7 +41,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
4241
"src/amqp10_client_app.erl",
4342
"src/amqp10_client_connection.erl",
4443
"src/amqp10_client_connection_sup.erl",
45-
"src/amqp10_client_connections_sup.erl",
4644
"src/amqp10_client_frame_reader.erl",
4745
"src/amqp10_client_session.erl",
4846
"src/amqp10_client_sessions_sup.erl",
@@ -77,7 +75,6 @@ def all_srcs(name = "all_srcs"):
7775
"src/amqp10_client_app.erl",
7876
"src/amqp10_client_connection.erl",
7977
"src/amqp10_client_connection_sup.erl",
80-
"src/amqp10_client_connections_sup.erl",
8178
"src/amqp10_client_frame_reader.erl",
8279
"src/amqp10_client_session.erl",
8380
"src/amqp10_client_sessions_sup.erl",

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
settle_msg/3,
3636
flow_link_credit/3,
3737
flow_link_credit/4,
38-
echo/1,
38+
stop_receiver_link/1,
3939
link_handle/1,
4040
get_msg/1,
4141
get_msg/2,
@@ -55,7 +55,7 @@
5555
-type attach_role() :: amqp10_client_session:attach_role().
5656
-type attach_args() :: amqp10_client_session:attach_args().
5757
-type filter() :: amqp10_client_session:filter().
58-
-type properties() :: amqp10_client_session:properties().
58+
-type properties() :: amqp10_client_types:properties().
5959

6060
-type connection_config() :: amqp10_client_connection:connection_config().
6161

@@ -109,10 +109,10 @@ open_connection(ConnectionConfig0) ->
109109
notify_when_closed => NotifyWhenClosed
110110
},
111111
Sasl = maps:get(sasl, ConnectionConfig1),
112-
ConnectionConfig2 = ConnectionConfig1#{sasl => amqp10_client_connection:encrypt_sasl(Sasl)},
113-
amqp10_client_connection:open(ConnectionConfig2).
112+
ConnectionConfig = ConnectionConfig1#{sasl => amqp10_client_connection:encrypt_sasl(Sasl)},
113+
amqp10_client_connection:open(ConnectionConfig).
114114

115-
%% @doc Opens a connection using a connection_config map
115+
%% @doc Closes a connection.
116116
%% This is asynchronous and will notify completion to the caller using
117117
%% an amqp10_event of the following format:
118118
%% {amqp10_event, {connection, ConnectionPid, {closed, Why}}}
@@ -271,9 +271,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
271271
%% This is asynchronous and will notify completion of the attach request to the
272272
%% caller using an amqp10_event of the following format:
273273
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
274-
-spec attach_receiver_link(pid(), binary(), binary(),
275-
snd_settle_mode(), terminus_durability(), filter(),
276-
properties()) ->
274+
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
275+
terminus_durability(), filter(), properties()) ->
277276
{ok, link_ref()}.
278277
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
279278
when is_pid(Session) andalso
@@ -307,43 +306,45 @@ detach_link(#link_ref{link_handle = Handle, session = Session}) ->
307306
amqp10_client_session:detach(Session, Handle).
308307

309308
%% @doc Grant credit to a sender.
310-
%% The amqp10_client will automatically grant more credit to the sender when
309+
%% The amqp10_client will automatically grant Credit to the sender when
311310
%% the remaining link credit falls below the value of RenewWhenBelow.
312-
%% If RenewWhenBelow is 'never' the client will never grant new credit. Instead
311+
%% If RenewWhenBelow is 'never' the client will never grant more credit. Instead
313312
%% the caller will be notified when the link_credit reaches 0 with an
314313
%% amqp10_event of the following format:
315314
%% {amqp10_event, {link, LinkRef, credit_exhausted}}
316315
-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
317-
RenewWhenBelow :: never | non_neg_integer()) -> ok.
316+
RenewWhenBelow :: never | pos_integer()) -> ok.
318317
flow_link_credit(Ref, Credit, RenewWhenBelow) ->
319318
flow_link_credit(Ref, Credit, RenewWhenBelow, false).
320319

321320
-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
322-
RenewWhenBelow :: never | non_neg_integer(),
321+
RenewWhenBelow :: never | pos_integer(),
323322
Drain :: boolean()) -> ok.
324323
flow_link_credit(#link_ref{role = receiver, session = Session,
325324
link_handle = Handle},
326-
Credit, RenewWhenBelow, Drain) ->
325+
Credit, RenewWhenBelow, Drain)
326+
when RenewWhenBelow =:= never orelse
327+
is_integer(RenewWhenBelow) andalso
328+
RenewWhenBelow > 0 andalso
329+
RenewWhenBelow =< Credit ->
327330
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
328331
drain = Drain},
329332
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
330333

331-
%% @doc Request that the sender's flow state is echoed back
332-
%% This may be used to determine when the Link has finally quiesced.
333-
%% see §2.6.10 of the spec
334-
echo(#link_ref{role = receiver, session = Session,
335-
link_handle = Handle}) ->
334+
%% @doc Stop a receiving link.
335+
%% See AMQP 1.0 spec §2.6.10.
336+
stop_receiver_link(#link_ref{role = receiver,
337+
session = Session,
338+
link_handle = Handle}) ->
336339
Flow = #'v1_0.flow'{link_credit = {uint, 0},
337340
echo = true},
338-
ok = amqp10_client_session:flow(Session, Handle, Flow, 0).
341+
ok = amqp10_client_session:flow(Session, Handle, Flow, never).
339342

340343
%%% messages
341344

342345
%% @doc Send a message on a the link referred to be the 'LinkRef'.
343-
%% Returns ok for "async" transfers when messages are sent with settled=true
344-
%% else it returns the delivery state from the disposition
345346
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
346-
ok | {error, insufficient_credit | link_not_found | half_attached}.
347+
ok | amqp10_client_session:transfer_error().
347348
send_msg(#link_ref{role = sender, session = Session,
348349
link_handle = Handle}, Msg0) ->
349350
Msg = amqp10_msg:set_handle(Handle, Msg0),

deps/amqp10_client/src/amqp10_client.hrl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
-define(AMQP_PROTOCOL_HEADER, <<"AMQP", 0, 1, 0, 0>>).
99
-define(SASL_PROTOCOL_HEADER, <<"AMQP", 3, 1, 0, 0>>).
10-
-define(MIN_MAX_FRAME_SIZE, 512).
11-
-define(MAX_MAX_FRAME_SIZE, 1024 * 1024).
1210
-define(FRAME_HEADER_SIZE, 8).
1311

1412
-define(TIMEOUT, 5000).

deps/amqp10_client/src/amqp10_client_app.erl

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,12 @@
99

1010
-behaviour(application).
1111

12-
%% Application callbacks
12+
%% application callbacks
1313
-export([start/2,
1414
stop/1]).
1515

16-
-type start_type() :: (
17-
normal |
18-
{takeover, Node :: node()} |
19-
{failover, Node :: node()}
20-
).
21-
-type state() :: term().
22-
23-
%%====================================================================
24-
%% API
25-
%%====================================================================
26-
27-
-spec start(StartType :: start_type(), StartArgs :: term()) ->
28-
{ok, Pid :: pid()} | {ok, Pid :: pid(), State :: state()} | {error, Reason :: term()}.
2916
start(_Type, _Args) ->
3017
amqp10_client_sup:start_link().
3118

32-
-spec stop(State :: state()) -> ok.
3319
stop(_State) ->
3420
ok.
35-
36-
%%====================================================================
37-
%% Internal functions
38-
%%====================================================================

0 commit comments

Comments
 (0)