Skip to content

Commit 48f096e

Browse files
committed
WIP AMQP Management
1 parent c7cb726 commit 48f096e

27 files changed

+1592
-174
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
!/deps/amqp10_common/
1515
!/deps/oauth2_client/
1616
!/deps/rabbitmq_amqp1_0/
17+
!/deps/rabbitmq_amqp_client/
1718
!/deps/rabbitmq_auth_backend_cache/
1819
!/deps/rabbitmq_auth_backend_http/
1920
!/deps/rabbitmq_auth_backend_ldap/

deps/amqp10_client/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ load(
2020

2121
APP_NAME = "amqp10_client"
2222

23-
APP_DESCRIPTION = "AMQP 1.0 client from the RabbitMQ Project"
23+
APP_DESCRIPTION = "AMQP 1.0 client"
2424

2525
APP_MODULE = "amqp10_client_app"
2626

deps/amqp10_client/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PROJECT = amqp10_client
2-
PROJECT_DESCRIPTION = AMQP 1.0 client from the RabbitMQ Project
2+
PROJECT_DESCRIPTION = AMQP 1.0 client
33
PROJECT_MOD = amqp10_client_app
44

55
define PROJECT_APP_EXTRA_KEYS

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
parse_uri/1
4343
]).
4444

45-
-define(DEFAULT_TIMEOUT, 5000).
46-
4745
-type snd_settle_mode() :: amqp10_client_session:snd_settle_mode().
4846
-type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode().
4947

@@ -134,7 +132,7 @@ begin_session(Connection) when is_pid(Connection) ->
134132
-spec begin_session_sync(pid()) ->
135133
supervisor:startchild_ret() | session_timeout.
136134
begin_session_sync(Connection) when is_pid(Connection) ->
137-
begin_session_sync(Connection, ?DEFAULT_TIMEOUT).
135+
begin_session_sync(Connection, ?TIMEOUT).
138136

139137
%% @doc Synchronously begins an amqp10 session using 'Connection'.
140138
%% This is a convenience function that awaits the 'begun' event
@@ -191,7 +189,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
191189
{ok, Ref};
192190
{amqp10_event, {link, Ref, {detached, Err}}} ->
193191
{error, Err}
194-
after ?DEFAULT_TIMEOUT -> link_timeout
192+
after ?TIMEOUT -> link_timeout
195193
end.
196194

197195
%% @doc Attaches a sender link to a target.
@@ -348,7 +346,7 @@ stop_receiver_link(#link_ref{role = receiver,
348346
send_msg(#link_ref{role = sender, session = Session,
349347
link_handle = Handle}, Msg0) ->
350348
Msg = amqp10_msg:set_handle(Handle, Msg0),
351-
amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT).
349+
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).
352350

353351
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
354352
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
@@ -368,7 +366,7 @@ settle_msg(#link_ref{role = receiver,
368366
%% Flows a single link credit then awaits delivery or timeout.
369367
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
370368
get_msg(LinkRef) ->
371-
get_msg(LinkRef, ?DEFAULT_TIMEOUT).
369+
get_msg(LinkRef, ?TIMEOUT).
372370

373371
%% @doc Get a single message from a link.
374372
%% Flows a single link credit then awaits delivery or timeout.

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
diff/2]).
5353

5454
-define(MAX_SESSION_WINDOW_SIZE, 65535).
55-
-define(DEFAULT_TIMEOUT, 5000).
5655
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
5756
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
5857
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
@@ -148,7 +147,7 @@
148147
reader :: pid(),
149148
socket :: amqp10_client_connection:amqp10_socket() | undefined,
150149
links = #{} :: #{output_handle() => #link{}},
151-
link_index = #{} :: #{link_name() => output_handle()},
150+
link_index = #{} :: #{{link_role(), link_name()} => output_handle()},
152151
link_handle_index = #{} :: #{input_handle() => output_handle()},
153152
next_link_handle = 0 :: output_handle(),
154153
early_attach_requests :: [term()],
@@ -172,7 +171,7 @@
172171

173172
-spec begin_sync(pid()) -> supervisor:startchild_ret().
174173
begin_sync(Connection) ->
175-
begin_sync(Connection, ?DEFAULT_TIMEOUT).
174+
begin_sync(Connection, ?TIMEOUT).
176175

177176
-spec begin_sync(pid(), non_neg_integer()) ->
178177
supervisor:startchild_ret() | session_timeout.
@@ -298,33 +297,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
298297
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
299298
initial_delivery_count = IDC,
300299
handle = {uint, InHandle},
300+
role = PeerRoleBool,
301301
max_message_size = MaybeMaxMessageSize},
302302
#state{links = Links, link_index = LinkIndex,
303303
link_handle_index = LHI} = State0) ->
304304

305-
#{Name := OutHandle} = LinkIndex,
305+
OurRoleBool = not PeerRoleBool,
306+
OurRole = boolean_to_role(OurRoleBool),
307+
LinkIndexKey = {OurRole, Name},
308+
#{LinkIndexKey := OutHandle} = LinkIndex,
306309
#{OutHandle := Link0} = Links,
307310
ok = notify_link_attached(Link0),
308311

309312
{DeliveryCount, MaxMessageSize} =
310313
case Link0 of
311-
#link{role = sender,
314+
#link{role = sender = OurRole,
312315
delivery_count = DC} ->
313316
MSS = case MaybeMaxMessageSize of
314317
{ulong, S} when S > 0 -> S;
315318
_ -> undefined
316319
end,
317320
{DC, MSS};
318-
#link{role = receiver,
321+
#link{role = receiver = OurRole,
319322
max_message_size = MSS} ->
320323
{unpack(IDC), MSS}
321324
end,
322325
Link = Link0#link{state = attached,
323326
input_handle = InHandle,
324327
delivery_count = DeliveryCount,
325328
max_message_size = MaxMessageSize},
326-
State = State0#state{links = Links#{OutHandle => Link},
327-
link_index = maps:remove(Name, LinkIndex),
329+
State = State0#state{links = Links#{OutHandle := Link},
330+
link_index = maps:remove(LinkIndexKey, LinkIndex),
328331
link_handle_index = LHI#{InHandle => OutHandle}},
329332
{keep_state, State};
330333
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
@@ -643,8 +646,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
643646

644647
make_source(#{role := {sender, _}}) ->
645648
#'v1_0.source'{};
646-
make_source(#{role := {receiver, #{address := Address} = Target, _Pid}, filter := Filter}) ->
647-
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
649+
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
650+
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
648651
TranslatedFilter = translate_filters(Filter),
649652
#'v1_0.source'{address = {utf8, Address},
650653
durable = {uint, Durable},
@@ -738,35 +741,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
738741
ok = send(Detach, State),
739742
Link#link{state = detach_sent}.
740743

741-
send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
742-
#state{next_link_handle = OutHandle0, links = Links,
744+
send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
745+
#state{next_link_handle = OutHandle0, links = Links,
743746
link_index = LinkIndex} = State) ->
744747

745748
Source = make_source(Args),
746749
Target = make_target(Args),
747750
Properties = amqp10_client_types:make_properties(Args),
748751

749-
{LinkTarget, RoleAsBool, InitialDeliveryCount, MaxMessageSize} =
750-
case Role of
752+
{LinkTarget, InitialDeliveryCount, MaxMessageSize} =
753+
case RoleTuple of
751754
{receiver, _, Pid} ->
752-
{{pid, Pid}, true, undefined, max_message_size(Args)};
755+
{{pid, Pid}, undefined, max_message_size(Args)};
753756
{sender, #{address := TargetAddr}} ->
754-
{TargetAddr, false, uint(?INITIAL_DELIVERY_COUNT), undefined}
755-
end,
756-
757-
{OutHandle, NextLinkHandle} =
758-
case Args of
759-
#{handle := Handle} ->
760-
%% Client app provided link handle.
761-
%% Really only meant for integration tests.
762-
{Handle, OutHandle0};
763-
_ ->
764-
{OutHandle0, OutHandle0 + 1}
757+
{TargetAddr, uint(?INITIAL_DELIVERY_COUNT), undefined}
765758
end,
766759

760+
{OutHandle, NextLinkHandle} = case Args of
761+
#{handle := Handle} ->
762+
%% Client app provided link handle.
763+
%% Really only meant for integration tests.
764+
{Handle, OutHandle0};
765+
_ ->
766+
{OutHandle0, OutHandle0 + 1}
767+
end,
768+
Role = element(1, RoleTuple),
767769
% create attach performative
768770
Attach = #'v1_0.attach'{name = {utf8, Name},
769-
role = RoleAsBool,
771+
role = role_to_boolean(Role),
770772
handle = {uint, OutHandle},
771773
source = Source,
772774
properties = Properties,
@@ -777,11 +779,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
777779
max_message_size = MaxMessageSize},
778780
ok = Send(Attach, State),
779781

782+
Ref = make_link_ref(Role, self(), OutHandle),
780783
Link = #link{name = Name,
781-
ref = make_link_ref(element(1, Role), self(), OutHandle),
784+
ref = Ref,
782785
output_handle = OutHandle,
783786
state = attach_sent,
784-
role = element(1, Role),
787+
role = Role,
785788
notify = FromPid,
786789
auto_flow = never,
787790
target = LinkTarget,
@@ -790,7 +793,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
790793

791794
{State#state{links = Links#{OutHandle => Link},
792795
next_link_handle = NextLinkHandle,
793-
link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}.
796+
link_index = LinkIndex#{{Role, Name} => OutHandle}}, Ref}.
794797

795798
-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
796799
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
@@ -1082,6 +1085,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
10821085
reason(undefined) -> normal;
10831086
reason(Other) -> Other.
10841087

1088+
role_to_boolean(sender) ->
1089+
?AMQP_ROLE_SENDER;
1090+
role_to_boolean(receiver) ->
1091+
?AMQP_ROLE_RECEIVER.
1092+
1093+
boolean_to_role(?AMQP_ROLE_SENDER) ->
1094+
sender;
1095+
boolean_to_role(?AMQP_ROLE_RECEIVER) ->
1096+
receiver.
1097+
10851098
format_status(Status = #{data := Data0}) ->
10861099
#state{channel = Channel,
10871100
remote_channel = RemoteChannel,

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
%%
77
-module(amqp10_msg).
88

9+
-include_lib("amqp10_common/include/amqp10_types.hrl").
10+
911
-export([from_amqp_records/1,
1012
to_amqp_records/1,
1113
% "read" api
@@ -256,12 +258,12 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
256258
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
257259
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
258260
settled = Settled,
259-
message_format = {uint, 0}},
261+
message_format = {uint, ?MESSAGE_FORMAT}},
260262
body = [#'v1_0.data'{content = Body}]};
261263
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
262264
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
263265
settled = Settled,
264-
message_format = {uint, 0}},
266+
message_format = {uint, ?MESSAGE_FORMAT}},
265267
body = Body}.
266268

267269
%% @doc Create a new settled amqp10 message using the specified delivery tag
@@ -322,8 +324,13 @@ set_properties(Props, #amqp10_msg{properties = undefined} = Msg) ->
322324
set_properties(Props, Msg#amqp10_msg{properties = #'v1_0.properties'{}});
323325
set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
324326
% TODO many fields are `any` types and we need to try to type tag them
325-
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
326-
% message_id can be any type but we restrict it here
327+
P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse
328+
T =:= uuid orelse
329+
T =:= binary orelse
330+
T =:= uf8 ->
331+
Acc#'v1_0.properties'{message_id = TypeVal};
332+
(message_id, V, Acc) when is_binary(V) ->
333+
%% backward compat clause
327334
Acc#'v1_0.properties'{message_id = utf8(V)};
328335
(user_id, V, Acc) when is_binary(V) ->
329336
Acc#'v1_0.properties'{user_id = {binary, V}};

deps/amqp10_common/include/amqp10_types.hrl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,10 @@
1010
-type transfer_number() :: sequence_no().
1111
% [2.8.10]
1212
-type sequence_no() :: uint().
13+
14+
% [2.8.1]
15+
-define(AMQP_ROLE_SENDER, false).
16+
-define(AMQP_ROLE_RECEIVER, true).
17+
18+
% [3.2.16]
19+
-define(MESSAGE_FORMAT, 0).

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"):
4747
"src/rabbit_access_control.erl",
4848
"src/rabbit_alarm.erl",
4949
"src/rabbit_amqp1_0.erl",
50+
"src/rabbit_amqp_management.erl",
5051
"src/rabbit_amqp_reader.erl",
5152
"src/rabbit_amqp_session.erl",
5253
"src/rabbit_amqp_session_sup.erl",
@@ -316,6 +317,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
316317
"src/rabbit_access_control.erl",
317318
"src/rabbit_alarm.erl",
318319
"src/rabbit_amqp1_0.erl",
320+
"src/rabbit_amqp_management.erl",
319321
"src/rabbit_amqp_reader.erl",
320322
"src/rabbit_amqp_session.erl",
321323
"src/rabbit_amqp_session_sup.erl",
@@ -600,6 +602,7 @@ def all_srcs(name = "all_srcs"):
600602
"src/rabbit_access_control.erl",
601603
"src/rabbit_alarm.erl",
602604
"src/rabbit_amqp1_0.erl",
605+
"src/rabbit_amqp_management.erl",
603606
"src/rabbit_amqp_reader.erl",
604607
"src/rabbit_amqp_session.erl",
605608
"src/rabbit_amqp_session_sup.erl",

deps/rabbit/include/rabbit_amqp.hrl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
%% [2.8.19]
2626
-define(MIN_MAX_FRAME_1_0_SIZE, 512).
2727

28-
-define(SEND_ROLE, false).
29-
-define(RECV_ROLE, true).
30-
3128
%% for rabbit_event user_authentication_success and user_authentication_failure
3229
-define(AUTH_EVENT_KEYS,
3330
[name,

0 commit comments

Comments
 (0)