19
19
-define (INIT_TXFR_COUNT , 0 ).
20
20
-define (DEFAULT_SEND_SETTLED , false ).
21
21
22
+ % % [3.4]
23
+ -define (OUTCOMES , [? V_1_0_SYMBOL_ACCEPTED ,
24
+ ? V_1_0_SYMBOL_REJECTED ,
25
+ ? V_1_0_SYMBOL_RELEASED ,
26
+ ? V_1_0_SYMBOL_MODIFIED ]).
27
+
22
28
% % Just make these constant for the time being.
23
29
-define (INCOMING_CREDIT , 65536 ).
24
30
41
47
42
48
-import (rabbit_amqp1_0_util , [protocol_error /3 ,
43
49
serial_add /2 , serial_diff /2 , serial_compare /2 ]).
44
- -import (rabbit_amqp1_0_link_util , [handle_to_ctag /1 ,
45
- ctag_to_handle /1 ]).
46
50
47
51
-record (incoming_link , {
48
52
name ,
58
62
-record (outgoing_link , {
59
63
queue :: undefined | rabbit_misc :resource_name (),
60
64
delivery_count = 0 ,
61
- % % TODO below 2 fields are not needed?
62
- send_settled ,
63
- default_outcome }).
65
+ % % TODO below field is not needed?
66
+ send_settled }).
64
67
65
68
-record (outgoing_unsettled , {
66
69
% % The queue sent us this consumer scoped sequence number.
@@ -316,7 +319,7 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE,
316
319
routing_key = RoutingKey ,
317
320
delivery_count = InitTransfer ,
318
321
recv_settle_mode = RcvSettleMode },
319
- { _DefaultOutcome , _Outcomes } = rabbit_amqp1_0_link_util : outcomes (Source ),
322
+ _Outcomes = outcomes (Source ),
320
323
Confirm = case SndSettleMode of
321
324
? V_1_0_SENDER_SETTLE_MODE_UNSETTLED -> true ;
322
325
? V_1_0_SENDER_SETTLE_MODE_SETTLED -> false ;
@@ -359,13 +362,11 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
359
362
queue_states = QStates0 } = State0 ) ->
360
363
361
364
ok = validate_attach (Attach ),
362
- {DefaultOutcome , Outcomes } = rabbit_amqp1_0_link_util :outcomes (Source ),
363
365
SndSettled = case SndSettleMode of
364
366
? V_1_0_SENDER_SETTLE_MODE_SETTLED -> true ;
365
367
? V_1_0_SENDER_SETTLE_MODE_UNSETTLED -> false ;
366
368
_ -> ? DEFAULT_SEND_SETTLED
367
369
end ,
368
- DOSym = amqp10_framing :symbol_for (DefaultOutcome ),
369
370
case ensure_source (Source , Vhost ) of
370
371
{ok , QNameBin } ->
371
372
CTag = handle_to_ctag (Handle ),
@@ -400,13 +401,16 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
400
401
false -> ? V_1_0_SENDER_SETTLE_MODE_UNSETTLED
401
402
end ,
402
403
rcv_settle_mode = RcvSettleMode ,
403
- source = Source # 'v1_0.source' {default_outcome = DefaultOutcome ,
404
- outcomes = Outcomes },
404
+ % % The queue process monitors our session process. When our session process terminates
405
+ % % (abnormally) any messages checked out to our session process will be requeued.
406
+ % % That's why the we only support RELEASED as the default outcome.
407
+ source = Source # 'v1_0.source' {
408
+ default_outcome = # 'v1_0.released' {},
409
+ outcomes = outcomes (Source )},
405
410
role = ? SEND_ROLE },
406
411
OutLink = # outgoing_link {delivery_count = ? INIT_TXFR_COUNT ,
407
412
queue = QNameBin ,
408
- send_settled = SndSettled ,
409
- default_outcome = DOSym },
413
+ send_settled = SndSettled },
410
414
{ok , [AttachReply ], OutLink , State1 };
411
415
{error , Reason } ->
412
416
protocol_error (
@@ -1394,7 +1398,7 @@ declare_queue(QNameBin, Vhost, TerminusDurability) ->
1394
1398
rabbit_core_metrics :queue_declared (QName ),
1395
1399
Q0 = amqqueue :new (QName ,
1396
1400
_Pid = none ,
1397
- rabbit_amqp1_0_link_util : durable (TerminusDurability ),
1401
+ queue_is_durable (TerminusDurability ),
1398
1402
_AutoDelete = false ,
1399
1403
_QOwner = none ,
1400
1404
_QArgs = [],
@@ -1411,6 +1415,47 @@ declare_queue(QNameBin, Vhost, TerminusDurability) ->
1411
1415
protocol_error (? V_1_0_AMQP_ERROR_INTERNAL_ERROR , " Failed to declare ~s : ~p " , [rabbit_misc :rs (QName ), Other ])
1412
1416
end .
1413
1417
1418
+ outcomes (# 'v1_0.source' {outcomes = undefined }) ->
1419
+ {array , symbol , ? OUTCOMES };
1420
+ outcomes (# 'v1_0.source' {outcomes = {array , symbol , Syms } = Outcomes }) ->
1421
+ case lists :filter (fun (O ) -> not lists :member (O , ? OUTCOMES ) end , Syms ) of
1422
+ [] ->
1423
+ Outcomes ;
1424
+ Unsupported ->
1425
+ rabbit_amqp1_0_util :protocol_error (
1426
+ ? V_1_0_AMQP_ERROR_NOT_IMPLEMENTED ,
1427
+ " Outcomes not supported: ~tp " ,
1428
+ [Unsupported ])
1429
+ end ;
1430
+ outcomes (# 'v1_0.source' {outcomes = Unsupported }) ->
1431
+ rabbit_amqp1_0_util :protocol_error (
1432
+ ? V_1_0_AMQP_ERROR_NOT_IMPLEMENTED ,
1433
+ " Outcomes not supported: ~tp " ,
1434
+ [Unsupported ]);
1435
+ outcomes (_ ) ->
1436
+ {array , symbol , ? OUTCOMES }.
1437
+
1438
+ -spec handle_to_ctag ({uint , non_neg_integer ()}) ->
1439
+ binary ().
1440
+ handle_to_ctag ({uint , H }) ->
1441
+ <<" ctag-" , H :32 /integer >>.
1442
+
1443
+ -spec ctag_to_handle (binary ()) ->
1444
+ {uint , non_neg_integer ()}.
1445
+ ctag_to_handle (<<" ctag-" , H :32 /integer >>) ->
1446
+ {uint , H }.
1447
+
1448
+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_NONE ) ->
1449
+ false ;
1450
+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_CONFIGURATION ) ->
1451
+ true ;
1452
+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_UNSETTLED_STATE ) ->
1453
+ true ;
1454
+ queue_is_durable (undefined ) ->
1455
+ % % <field name="durable" type="terminus-durability" default="none"/>
1456
+ % % [3.5.3]
1457
+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_NONE ).
1458
+
1414
1459
% %% TODO move copied code to some common module
1415
1460
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1416
1461
% % BEGIN copy from rabbit_channel %%%
0 commit comments