52
52
diff /2 ]).
53
53
54
54
-define (MAX_SESSION_WINDOW_SIZE , 65535 ).
55
- -define (DEFAULT_TIMEOUT , 5000 ).
56
55
-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
57
56
-define (INITIAL_OUTGOING_DELIVERY_ID , ? UINT_MAX ).
58
57
% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
149
148
reader :: pid (),
150
149
socket :: amqp10_client_connection :amqp10_socket () | undefined ,
151
150
links = #{} :: #{output_handle () => # link {}},
152
- link_index = #{} :: #{link_name () => output_handle ()},
151
+ link_index = #{} :: #{{ link_role (), link_name ()} => output_handle ()},
153
152
link_handle_index = #{} :: #{input_handle () => output_handle ()},
154
153
next_link_handle = 0 :: output_handle (),
155
154
early_attach_requests :: [term ()],
172
171
173
172
-spec begin_sync (pid ()) -> supervisor :startchild_ret ().
174
173
begin_sync (Connection ) ->
175
- begin_sync (Connection , ? DEFAULT_TIMEOUT ).
174
+ begin_sync (Connection , ? TIMEOUT ).
176
175
177
176
-spec begin_sync (pid (), non_neg_integer ()) ->
178
177
supervisor :startchild_ret () | session_timeout .
@@ -302,33 +301,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
302
301
mapped (cast , # 'v1_0.attach' {name = {utf8 , Name },
303
302
initial_delivery_count = IDC ,
304
303
handle = {uint , InHandle },
304
+ role = PeerRoleBool ,
305
305
max_message_size = MaybeMaxMessageSize },
306
306
# state {links = Links , link_index = LinkIndex ,
307
307
link_handle_index = LHI } = State0 ) ->
308
308
309
- #{Name := OutHandle } = LinkIndex ,
309
+ OurRoleBool = not PeerRoleBool ,
310
+ OurRole = boolean_to_role (OurRoleBool ),
311
+ LinkIndexKey = {OurRole , Name },
312
+ #{LinkIndexKey := OutHandle } = LinkIndex ,
310
313
#{OutHandle := Link0 } = Links ,
311
314
ok = notify_link_attached (Link0 ),
312
315
313
316
{DeliveryCount , MaxMessageSize } =
314
317
case Link0 of
315
- # link {role = sender ,
318
+ # link {role = sender = OurRole ,
316
319
delivery_count = DC } ->
317
320
MSS = case MaybeMaxMessageSize of
318
321
{ulong , S } when S > 0 -> S ;
319
322
_ -> undefined
320
323
end ,
321
324
{DC , MSS };
322
- # link {role = receiver ,
325
+ # link {role = receiver = OurRole ,
323
326
max_message_size = MSS } ->
324
327
{unpack (IDC ), MSS }
325
328
end ,
326
329
Link = Link0 # link {state = attached ,
327
330
input_handle = InHandle ,
328
331
delivery_count = DeliveryCount ,
329
332
max_message_size = MaxMessageSize },
330
- State = State0 # state {links = Links #{OutHandle => Link },
331
- link_index = maps :remove (Name , LinkIndex ),
333
+ State = State0 # state {links = Links #{OutHandle : = Link },
334
+ link_index = maps :remove (LinkIndexKey , LinkIndex ),
332
335
link_handle_index = LHI #{InHandle => OutHandle }},
333
336
{keep_state , State };
334
337
mapped (cast , # 'v1_0.detach' {handle = {uint , InHandle },
@@ -648,8 +651,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
648
651
649
652
make_source (#{role := {sender , _ }}) ->
650
653
# 'v1_0.source' {};
651
- make_source (#{role := {receiver , #{address := Address } = Target , _Pid }, filter := Filter }) ->
652
- Durable = translate_terminus_durability (maps :get (durable , Target , none )),
654
+ make_source (#{role := {receiver , #{address := Address } = Source , _Pid }, filter := Filter }) ->
655
+ Durable = translate_terminus_durability (maps :get (durable , Source , none )),
653
656
TranslatedFilter = translate_filters (Filter ),
654
657
# 'v1_0.source' {address = {utf8 , Address },
655
658
durable = {uint , Durable },
@@ -743,35 +746,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
743
746
ok = send (Detach , State ),
744
747
Link # link {state = detach_sent }.
745
748
746
- send_attach (Send , #{name := Name , role := Role } = Args , {FromPid , _ },
747
- # state {next_link_handle = OutHandle0 , links = Links ,
749
+ send_attach (Send , #{name := Name , role := RoleTuple } = Args , {FromPid , _ },
750
+ # state {next_link_handle = OutHandle0 , links = Links ,
748
751
link_index = LinkIndex } = State ) ->
749
752
750
753
Source = make_source (Args ),
751
754
Target = make_target (Args ),
752
755
Properties = amqp10_client_types :make_properties (Args ),
753
756
754
- {LinkTarget , RoleAsBool , InitialDeliveryCount , MaxMessageSize } =
755
- case Role of
757
+ {LinkTarget , InitialDeliveryCount , MaxMessageSize } =
758
+ case RoleTuple of
756
759
{receiver , _ , Pid } ->
757
- {{pid , Pid }, true , undefined , max_message_size (Args )};
760
+ {{pid , Pid }, undefined , max_message_size (Args )};
758
761
{sender , #{address := TargetAddr }} ->
759
- {TargetAddr , false , uint (? INITIAL_DELIVERY_COUNT ), undefined }
760
- end ,
761
-
762
- {OutHandle , NextLinkHandle } =
763
- case Args of
764
- #{handle := Handle } ->
765
- % % Client app provided link handle.
766
- % % Really only meant for integration tests.
767
- {Handle , OutHandle0 };
768
- _ ->
769
- {OutHandle0 , OutHandle0 + 1 }
762
+ {TargetAddr , uint (? INITIAL_DELIVERY_COUNT ), undefined }
770
763
end ,
771
764
765
+ {OutHandle , NextLinkHandle } = case Args of
766
+ #{handle := Handle } ->
767
+ % % Client app provided link handle.
768
+ % % Really only meant for integration tests.
769
+ {Handle , OutHandle0 };
770
+ _ ->
771
+ {OutHandle0 , OutHandle0 + 1 }
772
+ end ,
773
+ Role = element (1 , RoleTuple ),
772
774
% create attach performative
773
775
Attach = # 'v1_0.attach' {name = {utf8 , Name },
774
- role = RoleAsBool ,
776
+ role = role_to_boolean ( Role ) ,
775
777
handle = {uint , OutHandle },
776
778
source = Source ,
777
779
properties = Properties ,
@@ -782,12 +784,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
782
784
max_message_size = MaxMessageSize },
783
785
ok = Send (Attach , State ),
784
786
785
- LinkRef = make_link_ref (element ( 1 , Role ) , self (), OutHandle ),
787
+ Ref = make_link_ref (Role , self (), OutHandle ),
786
788
Link = # link {name = Name ,
787
- ref = LinkRef ,
789
+ ref = Ref ,
788
790
output_handle = OutHandle ,
789
791
state = attach_sent ,
790
- role = element ( 1 , Role ) ,
792
+ role = Role ,
791
793
notify = FromPid ,
792
794
auto_flow = never ,
793
795
target = LinkTarget ,
@@ -796,7 +798,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
796
798
797
799
{State # state {links = Links #{OutHandle => Link },
798
800
next_link_handle = NextLinkHandle ,
799
- link_index = LinkIndex #{Name => OutHandle }}, LinkRef }.
801
+ link_index = LinkIndex #{{ Role , Name } => OutHandle }}, Ref }.
800
802
801
803
- spec handle_session_flow (# 'v1_0.flow' {}, # state {}) -> # state {}.
802
804
handle_session_flow (# 'v1_0.flow' {next_incoming_id = MaybeNII ,
@@ -1090,6 +1092,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
1090
1092
reason (undefined ) -> normal ;
1091
1093
reason (Other ) -> Other .
1092
1094
1095
+ role_to_boolean (sender ) ->
1096
+ ? AMQP_ROLE_SENDER ;
1097
+ role_to_boolean (receiver ) ->
1098
+ ? AMQP_ROLE_RECEIVER .
1099
+
1100
+ boolean_to_role (? AMQP_ROLE_SENDER ) ->
1101
+ sender ;
1102
+ boolean_to_role (? AMQP_ROLE_RECEIVER ) ->
1103
+ receiver .
1104
+
1093
1105
format_status (Status = #{data := Data0 }) ->
1094
1106
# state {channel = Channel ,
1095
1107
remote_channel = RemoteChannel ,
0 commit comments