7
7
8
8
-module (rabbit_amqp1_0_session ).
9
9
10
- % %TODO change to gen_server
11
- -behaviour (gen_server2 ).
10
+ -behaviour (gen_server ).
12
11
13
12
-include_lib (" amqp_client/include/amqp_client.hrl" ).
14
13
-include (" rabbit_amqp1_0.hrl" ).
117
116
}).
118
117
119
118
start_link (Args ) ->
120
- gen_server2 :start_link (? MODULE , Args , []).
119
+ gen_server :start_link (? MODULE , Args , []).
121
120
122
121
get_info (Pid ) ->
123
- gen_server2 :call (Pid , info , ? CALL_TIMEOUT ).
122
+ gen_server :call (Pid , info , ? CALL_TIMEOUT ).
124
123
125
124
process_frame (Pid , Frame ) ->
126
125
credit_flow :send (Pid ),
127
- gen_server2 :cast (Pid , {frame , Frame , self ()}).
126
+ gen_server :cast (Pid , {frame , Frame , self ()}).
128
127
129
128
init ({Channel , ReaderPid , WriterPid , User , Vhost , FrameMax }) ->
130
129
process_flag (trap_exit , true ),
@@ -367,12 +366,8 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
367
366
_ -> ? DEFAULT_SEND_SETTLED
368
367
end ,
369
368
DOSym = amqp10_framing :symbol_for (DefaultOutcome ),
370
- case ensure_source (Source ,
371
- # outgoing_link {delivery_count = ? INIT_TXFR_COUNT ,
372
- send_settled = SndSettled ,
373
- default_outcome = DOSym },
374
- Vhost ) of
375
- {ok , OutgoingLink = # outgoing_link {queue = QNameBin }} ->
369
+ case ensure_source (Source , Vhost ) of
370
+ {ok , QNameBin } ->
376
371
CTag = handle_to_ctag (Handle ),
377
372
Args = source_filters_to_consumer_args (Source ) ++
378
373
[{<<" x-credit" >>, table , [{<<" credit" >>, long , 0 },
@@ -408,7 +403,11 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
408
403
source = Source # 'v1_0.source' {default_outcome = DefaultOutcome ,
409
404
outcomes = Outcomes },
410
405
role = ? SEND_ROLE },
411
- {ok , [AttachReply ], OutgoingLink , State1 };
406
+ OutLink = # outgoing_link {delivery_count = ? INIT_TXFR_COUNT ,
407
+ queue = QNameBin ,
408
+ send_settled = SndSettled ,
409
+ default_outcome = DOSym },
410
+ {ok , [AttachReply ], OutLink , State1 };
412
411
{error , Reason } ->
413
412
protocol_error (
414
413
? V_1_0_AMQP_ERROR_INTERNAL_ERROR ,
@@ -1250,10 +1249,10 @@ outgoing_link_flow(#outgoing_link{delivery_count = DeliveryCountSnd,
1250
1249
default (undefined , Default ) -> Default ;
1251
1250
default (Thing , _Default ) -> Thing .
1252
1251
1253
- ensure_source (# 'v1_0.source' {dynamic = true }, _Link , _Vhost ) ->
1252
+ ensure_source (# 'v1_0.source' {dynamic = true }, _Vhost ) ->
1254
1253
protocol_error (? V_1_0_AMQP_ERROR_NOT_IMPLEMENTED , " Dynamic sources not supported" , []);
1255
1254
ensure_source (# 'v1_0.source' {address = Address ,
1256
- durable = Durable }, Link , Vhost ) ->
1255
+ durable = Durable }, Vhost ) ->
1257
1256
case Address of
1258
1257
{utf8 , SourceAddr } ->
1259
1258
case rabbit_routing_util :parse_endpoint (SourceAddr , false ) of
@@ -1264,7 +1263,7 @@ ensure_source(#'v1_0.source'{address = Address,
1264
1263
case rabbit_routing_util :parse_routing (Src ) of
1265
1264
{" " , QNameList } ->
1266
1265
true = string :equal (QNameList , QNameBin ),
1267
- {ok , Link # outgoing_link { queue = QNameBin } };
1266
+ {ok , QNameBin };
1268
1267
{XNameList , RoutingKeyList } ->
1269
1268
RoutingKey = list_to_binary (RoutingKeyList ),
1270
1269
XNameBin = list_to_binary (XNameList ),
@@ -1276,7 +1275,7 @@ ensure_source(#'v1_0.source'{address = Address,
1276
1275
% % TODO authorisation checks
1277
1276
case rabbit_binding :add (Binding , <<" todo" >>) of
1278
1277
ok ->
1279
- {ok , Link # outgoing_link { queue = QNameBin } };
1278
+ {ok , QNameBin };
1280
1279
{error , _ } = Err ->
1281
1280
Err
1282
1281
end
0 commit comments