52
52
diff /2 ]).
53
53
54
54
-define (MAX_SESSION_WINDOW_SIZE , 65535 ).
55
- -define (DEFAULT_TIMEOUT , 5000 ).
56
55
-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
57
56
-define (INITIAL_OUTGOING_DELIVERY_ID , ? UINT_MAX ).
58
57
% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
148
147
reader :: pid (),
149
148
socket :: amqp10_client_connection :amqp10_socket () | undefined ,
150
149
links = #{} :: #{output_handle () => # link {}},
151
- link_index = #{} :: #{link_name () => output_handle ()},
150
+ link_index = #{} :: #{{ link_role (), link_name ()} => output_handle ()},
152
151
link_handle_index = #{} :: #{input_handle () => output_handle ()},
153
152
next_link_handle = 0 :: output_handle (),
154
153
early_attach_requests :: [term ()],
172
171
173
172
-spec begin_sync (pid ()) -> supervisor :startchild_ret ().
174
173
begin_sync (Connection ) ->
175
- begin_sync (Connection , ? DEFAULT_TIMEOUT ).
174
+ begin_sync (Connection , ? TIMEOUT ).
176
175
177
176
-spec begin_sync (pid (), non_neg_integer ()) ->
178
177
supervisor :startchild_ret () | session_timeout .
@@ -298,33 +297,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
298
297
mapped (cast , # 'v1_0.attach' {name = {utf8 , Name },
299
298
initial_delivery_count = IDC ,
300
299
handle = {uint , InHandle },
300
+ role = PeerRoleBool ,
301
301
max_message_size = MaybeMaxMessageSize },
302
302
# state {links = Links , link_index = LinkIndex ,
303
303
link_handle_index = LHI } = State0 ) ->
304
304
305
- #{Name := OutHandle } = LinkIndex ,
305
+ OurRoleBool = not unwrap (PeerRoleBool ),
306
+ OurRole = boolean_to_role (OurRoleBool ),
307
+ LinkIndexKey = {OurRole , Name },
308
+ #{LinkIndexKey := OutHandle } = LinkIndex ,
306
309
#{OutHandle := Link0 } = Links ,
307
310
ok = notify_link_attached (Link0 ),
308
311
309
312
{DeliveryCount , MaxMessageSize } =
310
313
case Link0 of
311
- # link {role = sender ,
314
+ # link {role = sender = OurRole ,
312
315
delivery_count = DC } ->
313
316
MSS = case MaybeMaxMessageSize of
314
317
{ulong , S } when S > 0 -> S ;
315
318
_ -> undefined
316
319
end ,
317
320
{DC , MSS };
318
- # link {role = receiver ,
321
+ # link {role = receiver = OurRole ,
319
322
max_message_size = MSS } ->
320
323
{unpack (IDC ), MSS }
321
324
end ,
322
325
Link = Link0 # link {state = attached ,
323
326
input_handle = InHandle ,
324
327
delivery_count = DeliveryCount ,
325
328
max_message_size = MaxMessageSize },
326
- State = State0 # state {links = Links #{OutHandle => Link },
327
- link_index = maps :remove (Name , LinkIndex ),
329
+ State = State0 # state {links = Links #{OutHandle : = Link },
330
+ link_index = maps :remove (LinkIndexKey , LinkIndex ),
328
331
link_handle_index = LHI #{InHandle => OutHandle }},
329
332
{keep_state , State };
330
333
mapped (cast , # 'v1_0.detach' {handle = {uint , InHandle },
@@ -643,8 +646,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
643
646
644
647
make_source (#{role := {sender , _ }}) ->
645
648
# 'v1_0.source' {};
646
- make_source (#{role := {receiver , #{address := Address } = Target , _Pid }, filter := Filter }) ->
647
- Durable = translate_terminus_durability (maps :get (durable , Target , none )),
649
+ make_source (#{role := {receiver , #{address := Address } = Source , _Pid }, filter := Filter }) ->
650
+ Durable = translate_terminus_durability (maps :get (durable , Source , none )),
648
651
TranslatedFilter = translate_filters (Filter ),
649
652
# 'v1_0.source' {address = {utf8 , Address },
650
653
durable = {uint , Durable },
@@ -738,35 +741,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
738
741
ok = send (Detach , State ),
739
742
Link # link {state = detach_sent }.
740
743
741
- send_attach (Send , #{name := Name , role := Role } = Args , {FromPid , _ },
742
- # state {next_link_handle = OutHandle0 , links = Links ,
744
+ send_attach (Send , #{name := Name , role := RoleTuple } = Args , {FromPid , _ },
745
+ # state {next_link_handle = OutHandle0 , links = Links ,
743
746
link_index = LinkIndex } = State ) ->
744
747
745
748
Source = make_source (Args ),
746
749
Target = make_target (Args ),
747
750
Properties = amqp10_client_types :make_properties (Args ),
748
751
749
- {LinkTarget , RoleAsBool , InitialDeliveryCount , MaxMessageSize } =
750
- case Role of
752
+ {LinkTarget , InitialDeliveryCount , MaxMessageSize } =
753
+ case RoleTuple of
751
754
{receiver , _ , Pid } ->
752
- {{pid , Pid }, true , undefined , max_message_size (Args )};
755
+ {{pid , Pid }, undefined , max_message_size (Args )};
753
756
{sender , #{address := TargetAddr }} ->
754
- {TargetAddr , false , uint (? INITIAL_DELIVERY_COUNT ), undefined }
755
- end ,
756
-
757
- {OutHandle , NextLinkHandle } =
758
- case Args of
759
- #{handle := Handle } ->
760
- % % Client app provided link handle.
761
- % % Really only meant for integration tests.
762
- {Handle , OutHandle0 };
763
- _ ->
764
- {OutHandle0 , OutHandle0 + 1 }
757
+ {TargetAddr , uint (? INITIAL_DELIVERY_COUNT ), undefined }
765
758
end ,
766
759
760
+ {OutHandle , NextLinkHandle } = case Args of
761
+ #{handle := Handle } ->
762
+ % % Client app provided link handle.
763
+ % % Really only meant for integration tests.
764
+ {Handle , OutHandle0 };
765
+ _ ->
766
+ {OutHandle0 , OutHandle0 + 1 }
767
+ end ,
768
+ Role = element (1 , RoleTuple ),
767
769
% create attach performative
768
770
Attach = # 'v1_0.attach' {name = {utf8 , Name },
769
- role = RoleAsBool ,
771
+ role = role_to_boolean ( Role ) ,
770
772
handle = {uint , OutHandle },
771
773
source = Source ,
772
774
properties = Properties ,
@@ -777,11 +779,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
777
779
max_message_size = MaxMessageSize },
778
780
ok = Send (Attach , State ),
779
781
782
+ Ref = make_link_ref (Role , self (), OutHandle ),
780
783
Link = # link {name = Name ,
781
- ref = make_link_ref ( element ( 1 , Role ), self (), OutHandle ) ,
784
+ ref = Ref ,
782
785
output_handle = OutHandle ,
783
786
state = attach_sent ,
784
- role = element ( 1 , Role ) ,
787
+ role = Role ,
785
788
notify = FromPid ,
786
789
auto_flow = never ,
787
790
target = LinkTarget ,
@@ -790,7 +793,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
790
793
791
794
{State # state {links = Links #{OutHandle => Link },
792
795
next_link_handle = NextLinkHandle ,
793
- link_index = LinkIndex #{Name => OutHandle }}, Link # link . ref }.
796
+ link_index = LinkIndex #{{ Role , Name } => OutHandle }}, Ref }.
794
797
795
798
- spec handle_session_flow (# 'v1_0.flow' {}, # state {}) -> # state {}.
796
799
handle_session_flow (# 'v1_0.flow' {next_incoming_id = MaybeNII ,
@@ -1073,6 +1076,11 @@ wrap_map_value(V) when is_list(V) ->
1073
1076
wrap_map_value (V ) when is_atom (V ) ->
1074
1077
utf8 (atom_to_list (V )).
1075
1078
1079
+ unwrap ({_Type , Val }) ->
1080
+ Val ;
1081
+ unwrap (Val ) ->
1082
+ Val .
1083
+
1076
1084
utf8 (V ) -> amqp10_client_types :utf8 (V ).
1077
1085
1078
1086
sym (B ) when is_binary (B ) -> {symbol , B };
@@ -1082,6 +1090,18 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
1082
1090
reason (undefined ) -> normal ;
1083
1091
reason (Other ) -> Other .
1084
1092
1093
+ role_to_boolean (sender ) ->
1094
+ ? V_1_0_ROLE_SENDER ;
1095
+ role_to_boolean (receiver ) ->
1096
+ ? V_1_0_ROLE_RECEIVER .
1097
+
1098
+ boolean_to_role (? V_1_0_ROLE_SENDER ) ->
1099
+ sender ;
1100
+ boolean_to_role (? V_1_0_ROLE_RECEIVER ) ->
1101
+ receiver ;
1102
+ boolean_to_role (Bool ) when is_boolean (Bool ) ->
1103
+ boolean_to_role ({boolean , Bool }).
1104
+
1085
1105
format_status (Status = #{data := Data0 }) ->
1086
1106
# state {channel = Channel ,
1087
1107
remote_channel = RemoteChannel ,
0 commit comments