50
50
incoming_max_frame_size :: pos_integer (),
51
51
outgoing_max_frame_size :: unlimited | pos_integer (),
52
52
channel_max :: non_neg_integer (),
53
- auth_mechanism :: none | {binary (), module ()},
53
+ auth_mechanism :: sasl_init_unprocessed | {binary (), module ()},
54
54
auth_state :: term (),
55
55
properties :: undefined | {map , list (tuple ())}
56
56
}).
85
85
% %--------------------------------------------------------------------------
86
86
87
87
unpack_from_0_9_1 (
88
- {Sock ,RecvLen , PendingRecv , SupPid , Buf , BufLen , ProxySocket ,
88
+ {Sock , PendingRecv , SupPid , Buf , BufLen , ProxySocket ,
89
89
ConnectionName , Host , PeerHost , Port , PeerPort , ConnectedAt },
90
- Parent , HandshakeTimeout ) ->
90
+ Parent ) ->
91
91
logger :update_process_metadata (#{connection => ConnectionName }),
92
- # v1 {parent = Parent ,
93
- sock = Sock ,
94
- callback = handshake ,
95
- recv_len = RecvLen ,
96
- pending_recv = PendingRecv ,
97
- connection_state = received_amqp3100 ,
98
- heartbeater = none ,
99
- helper_sup = SupPid ,
100
- buf = Buf ,
101
- buf_len = BufLen ,
102
- proxy_socket = ProxySocket ,
103
- tracked_channels = maps : new () ,
104
- writer = none ,
92
+ # v1 {parent = Parent ,
93
+ sock = Sock ,
94
+ callback = { frame_header , sasl } ,
95
+ recv_len = 8 ,
96
+ pending_recv = PendingRecv ,
97
+ heartbeater = none ,
98
+ helper_sup = SupPid ,
99
+ buf = Buf ,
100
+ buf_len = BufLen ,
101
+ proxy_socket = ProxySocket ,
102
+ tracked_channels = maps : new () ,
103
+ writer = none ,
104
+ connection_state = received_amqp3100 ,
105
105
connection = # v1_connection {
106
106
name = ConnectionName ,
107
107
vhost = none ,
@@ -111,12 +111,12 @@ unpack_from_0_9_1(
111
111
peer_port = PeerPort ,
112
112
connected_at = ConnectedAt ,
113
113
user = unauthenticated ,
114
- timeout = HandshakeTimeout ,
114
+ timeout = ? NORMAL_TIMEOUT ,
115
115
incoming_max_frame_size = ? INITIAL_MAX_FRAME_SIZE ,
116
116
outgoing_max_frame_size = ? INITIAL_MAX_FRAME_SIZE ,
117
117
channel_max = 0 ,
118
- auth_mechanism = none ,
119
- auth_state = none }}.
118
+ auth_mechanism = sasl_init_unprocessed ,
119
+ auth_state = unauthenticated }}.
120
120
121
121
-spec system_continue (pid (), [sys :dbg_opt ()], state ()) -> no_return () | ok .
122
122
system_continue (Parent , Deb , State ) ->
@@ -142,7 +142,9 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
142
142
143
143
recvloop (Deb , State = # v1 {pending_recv = true }) ->
144
144
mainloop (Deb , State );
145
- recvloop (Deb , State = # v1 {sock = Sock , recv_len = RecvLen , buf_len = BufLen })
145
+ recvloop (Deb , State = # v1 {sock = Sock ,
146
+ recv_len = RecvLen ,
147
+ buf_len = BufLen })
146
148
when BufLen < RecvLen ->
147
149
case rabbit_net :setopts (Sock , [{active , once }]) of
148
150
ok ->
@@ -203,10 +205,10 @@ handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
203
205
exit (Reason );
204
206
handle_other ({{'DOWN' , ChannelNum }, _MRef , process , SessionPid , Reason }, State ) ->
205
207
handle_session_exit (ChannelNum , SessionPid , Reason , State );
206
- handle_other (handshake_timeout , State )
207
- when ? IS_RUNNING ( State ) orelse
208
- State # v1 . connection_state =:= closing orelse
209
- State # v1 . connection_state =:= closed ->
208
+ handle_other (handshake_timeout , State = # v1 { connection_state = ConnState } )
209
+ when ConnState =:= running orelse
210
+ ConnState =:= closing orelse
211
+ ConnState =:= closed ->
210
212
State ;
211
213
handle_other (handshake_timeout , State ) ->
212
214
throw ({handshake_timeout , State # v1 .callback });
@@ -573,13 +575,14 @@ handle_sasl_frame(#'v1_0.sasl_response'{response = {binary, Response}},
573
575
handle_sasl_frame (Performative , State ) ->
574
576
throw ({unexpected_1_0_sasl_frame , Performative , State }).
575
577
576
- handle_input (handshake , <<" AMQP" ,0 ,1 ,0 ,0 >>,
578
+ handle_input (handshake ,
579
+ <<" AMQP" ,0 ,1 ,0 ,0 >>,
577
580
# v1 {connection_state = waiting_amqp0100 ,
578
581
sock = Sock ,
579
- connection = Connection = # v1_connection {user = # user {}},
582
+ connection = # v1_connection {user = # user {}},
580
583
helper_sup = HelperSup
581
584
} = State0 ) ->
582
- % % Client already got successfully authenticated by SASL.
585
+ % % At this point, client already got successfully authenticated by SASL.
583
586
send_handshake (Sock , <<" AMQP" ,0 ,1 ,0 ,0 >>),
584
587
ChildSpec = #{id => session_sup ,
585
588
start => {rabbit_amqp_session_sup , start_link , [self ()]},
@@ -593,8 +596,7 @@ handle_input(handshake, <<"AMQP",0,1,0,0>>,
593
596
% % "After establishing or accepting a TCP connection and sending
594
597
% % the protocol header, each peer MUST send an open frame before
595
598
% % sending any other frames." [2.4.1]
596
- connection_state = waiting_open ,
597
- connection = Connection # v1_connection {timeout = ? NORMAL_TIMEOUT }},
599
+ connection_state = waiting_open },
598
600
switch_callback (State , {frame_header , amqp }, 8 );
599
601
handle_input ({frame_header , Mode },
600
602
Header = <<Size :32 , DOff :8 , Type :8 , Channel :16 >>,
@@ -620,7 +622,8 @@ handle_input({frame_header, Mode},
620
622
handle_input ({frame_header , _Mode }, Malformed , _State ) ->
621
623
throw ({bad_1_0_header , Malformed });
622
624
handle_input ({frame_body , Mode , DOff , Channel },
623
- FrameBin , State ) ->
625
+ FrameBin ,
626
+ State ) ->
624
627
% % Figure 2.16
625
628
% % DOff = 4-byte words minus 8 bytes we've already read
626
629
ExtendedHeaderSize = (DOff * 32 - 64 ),
@@ -633,24 +636,21 @@ handle_input(Callback, Data, _State) ->
633
636
634
637
-spec init (tuple ()) -> no_return ().
635
638
init (PackedState ) ->
636
- {ok , HandshakeTimeout } = application :get_env (rabbit , handshake_timeout ),
637
639
{parent , Parent } = erlang :process_info (self (), parent ),
638
640
ok = rabbit_connection_sup :remove_connection_helper_sup (Parent , helper_sup_amqp_091 ),
639
- State0 = unpack_from_0_9_1 (PackedState , Parent , HandshakeTimeout ),
641
+ State0 = unpack_from_0_9_1 (PackedState , Parent ),
640
642
State = advertise_sasl_mechanism (State0 ),
641
643
% % By invoking recvloop here we become 1.0.
642
644
recvloop (sys :debug_options ([]), State ).
643
645
644
646
advertise_sasl_mechanism (State0 = # v1 {connection_state = received_amqp3100 ,
645
- connection = Connection ,
646
647
sock = Sock }) ->
647
648
send_handshake (Sock , <<" AMQP" ,3 ,1 ,0 ,0 >>),
648
649
Ms0 = [{symbol , atom_to_binary (M )} || M <- auth_mechanisms (Sock )],
649
650
Ms1 = {array , symbol , Ms0 },
650
651
Ms = # 'v1_0.sasl_mechanisms' {sasl_server_mechanisms = Ms1 },
651
652
ok = send_on_channel0 (Sock , Ms , rabbit_amqp_sasl ),
652
- State = State0 # v1 {connection_state = waiting_sasl_init ,
653
- connection = Connection # v1_connection {timeout = ? NORMAL_TIMEOUT }},
653
+ State = State0 # v1 {connection_state = waiting_sasl_init },
654
654
switch_callback (State , {frame_header , sasl }, 8 ).
655
655
656
656
send_handshake (Sock , Handshake ) ->
@@ -715,15 +715,16 @@ auth_phase(
715
715
auth_fail (none , State ),
716
716
protocol_error (? V_1_0_AMQP_ERROR_DECODE_ERROR , Msg , Args );
717
717
{challenge , Challenge , AuthState1 } ->
718
- Secure = # 'v1_0.sasl_challenge' {challenge = {binary , Challenge }},
719
- ok = send_on_channel0 (Sock , Secure , rabbit_amqp_sasl ),
720
- State # v1 {connection = Conn # v1_connection {auth_state = AuthState1 }};
718
+ Challenge = # 'v1_0.sasl_challenge' {challenge = {binary , Challenge }},
719
+ ok = send_on_channel0 (Sock , Challenge , rabbit_amqp_sasl ),
720
+ State1 = State # v1 {connection = Conn # v1_connection {auth_state = AuthState1 }},
721
+ switch_callback (State1 , {frame_header , sasl }, 8 );
721
722
{ok , User } ->
722
723
Outcome = # 'v1_0.sasl_outcome' {code = ? V_1_0_SASL_CODE_OK },
723
724
ok = send_on_channel0 (Sock , Outcome , rabbit_amqp_sasl ),
724
725
State1 = State # v1 {connection_state = waiting_amqp0100 ,
725
726
connection = Conn # v1_connection {user = User ,
726
- auth_state = none }},
727
+ auth_state = authenticated }},
727
728
switch_callback (State1 , handshake , 8 )
728
729
end .
729
730
0 commit comments