19
19
-rabbit_deprecated_feature (
20
20
{amqp_address_v1 ,
21
21
#{deprecation_phase => permitted_by_default ,
22
+ doc_url => " https://www.rabbitmq.com/docs/next/amqp#address" ,
22
23
messages =>
23
24
#{when_permitted =>
24
25
" RabbitMQ AMQP address version 1 is deprecated. "
25
- " Clients should use RabbitMQ AMQP address version 2." }}
26
+ " Clients should use RabbitMQ AMQP address version 2." ,
27
+ when_denied =>
28
+ " RabbitMQ AMQP address version 1 is unsupported. "
29
+ " Clients must use RabbitMQ AMQP address version 2."
30
+ }}
26
31
}).
27
32
28
33
-define (PROTOCOL , amqp10 ).
@@ -2422,12 +2427,20 @@ ensure_source(#'v1_0.source'{address = Address,
2422
2427
durable = Durable },
2423
2428
Vhost , User , PermCache , TopicPermCache ) ->
2424
2429
case Address of
2430
+ {utf8 , <<" /q/" , QNameBinQuoted /binary >>} ->
2431
+ % % The only possible v2 source address format is:
2432
+ % % /q/:queue
2433
+ QNameBin = unquote (QNameBinQuoted ),
2434
+ QName = queue_resource (Vhost , QNameBin ),
2435
+ ok = exit_if_absent (QName ),
2436
+ {ok , QName , PermCache , TopicPermCache };
2425
2437
{utf8 , SourceAddr } ->
2426
2438
case address_v1_permitted () of
2427
- true -> ensure_source_v1 (
2428
- SourceAddr , Vhost , User , Durable , PermCache , TopicPermCache );
2429
- false -> ensure_source_v2 (
2430
- SourceAddr , Vhost , PermCache , TopicPermCache )
2439
+ true ->
2440
+ ensure_source_v1 (SourceAddr , Vhost , User , Durable ,
2441
+ PermCache , TopicPermCache );
2442
+ false ->
2443
+ {error , {amqp_address_v1_not_permitted , Address }}
2431
2444
end ;
2432
2445
_ ->
2433
2446
{error , {bad_address , Address }}
@@ -2467,19 +2480,10 @@ ensure_source_v1(Address,
2467
2480
Err
2468
2481
end
2469
2482
end ;
2470
- {error , _ } ->
2471
- ensure_source_v2 ( Address , Vhost , PermCache0 , TopicPermCache0 )
2483
+ {error , _ } = Err ->
2484
+ Err
2472
2485
end .
2473
2486
2474
- % % The only possible v2 source address format is:
2475
- % % /queue/:queue
2476
- ensure_source_v2 (<<" /queue/" , QNameBin /binary >>, Vhost , PermCache , TopicPermCache ) ->
2477
- QName = queue_resource (Vhost , QNameBin ),
2478
- ok = exit_if_absent (QName ),
2479
- {ok , QName , PermCache , TopicPermCache };
2480
- ensure_source_v2 (Address , _ , _ , _ ) ->
2481
- {error , {bad_address , Address }}.
2482
-
2483
2487
-spec ensure_target (# 'v1_0.target' {},
2484
2488
rabbit_types :vhost (),
2485
2489
rabbit_types :user (),
@@ -2495,29 +2499,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
2495
2499
ensure_target (# 'v1_0.target' {address = Address ,
2496
2500
durable = Durable },
2497
2501
Vhost , User , PermCache ) ->
2498
- case address_v1_permitted () of
2499
- true ->
2500
- try_target_v1 (Address , Vhost , User , Durable , PermCache );
2501
- false ->
2502
- try_target_v2 (Address , Vhost , User , PermCache )
2503
- end .
2504
-
2505
- try_target_v1 (Address , Vhost , User , Durable , PermCache0 ) ->
2506
- case ensure_target_v1 (Address , Vhost , User , Durable , PermCache0 ) of
2507
- {ok , XNameBin , RKey , QNameBin , PermCache } ->
2508
- check_exchange (XNameBin , RKey , QNameBin , User , Vhost , PermCache );
2509
- {error , _ } ->
2510
- try_target_v2 (Address , Vhost , User , PermCache0 )
2511
- end .
2512
-
2513
- try_target_v2 (Address , Vhost , User , PermCache ) ->
2514
- case ensure_target_v2 (Address , Vhost ) of
2515
- {ok , to , RKey , QNameBin } ->
2516
- {ok , to , RKey , QNameBin , PermCache };
2517
- {ok , XNameBin , RKey , QNameBin } ->
2518
- check_exchange (XNameBin , RKey , QNameBin , User , Vhost , PermCache );
2519
- {error , _ } = Err ->
2520
- Err
2502
+ case target_address_version (Address ) of
2503
+ 2 ->
2504
+ case ensure_target_v2 (Address , Vhost ) of
2505
+ {ok , to , RKey , QNameBin } ->
2506
+ {ok , to , RKey , QNameBin , PermCache };
2507
+ {ok , XNameBin , RKey , QNameBin } ->
2508
+ check_exchange (XNameBin , RKey , QNameBin , User , Vhost , PermCache );
2509
+ {error , _ } = Err ->
2510
+ Err
2511
+ end ;
2512
+ 1 ->
2513
+ case address_v1_permitted () of
2514
+ true ->
2515
+ case ensure_target_v1 (Address , Vhost , User , Durable , PermCache ) of
2516
+ {ok , XNameBin , RKey , QNameBin , PermCache1 } ->
2517
+ check_exchange (XNameBin , RKey , QNameBin , User , Vhost , PermCache1 );
2518
+ {error , _ } = Err ->
2519
+ Err
2520
+ end ;
2521
+ false ->
2522
+ {error , {amqp_address_v1_not_permitted , Address }}
2523
+ end
2521
2524
end .
2522
2525
2523
2526
check_exchange (XNameBin , RKey , QNameBin , User , Vhost , PermCache0 ) ->
@@ -2539,29 +2542,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
2539
2542
exit_not_found (XName )
2540
2543
end .
2541
2544
2542
- ensure_target_v1 ({utf8 , Address }, Vhost , User , Durable , PermCache0 ) ->
2543
- case rabbit_routing_parser :parse_endpoint (Address , true ) of
2544
- {ok , Dest } ->
2545
- {QNameBin , PermCache } = ensure_terminus (
2546
- target , Dest , Vhost , User , Durable , PermCache0 ),
2547
- {XNameList1 , RK } = rabbit_routing_parser :parse_routing (Dest ),
2548
- XNameBin = unicode :characters_to_binary (XNameList1 ),
2549
- RoutingKey = case RK of
2550
- undefined -> subject ;
2551
- [] -> subject ;
2552
- _ -> unicode :characters_to_binary (RK )
2553
- end ,
2554
- {ok , XNameBin , RoutingKey , QNameBin , PermCache };
2555
- {error , _ } = Err ->
2556
- Err
2557
- end ;
2558
- ensure_target_v1 (Address , _ , _ , _ , _ ) ->
2559
- {error , {bad_address , Address }}.
2545
+ address_v1_permitted () ->
2546
+ rabbit_deprecated_features :is_permitted (amqp_address_v1 ).
2547
+
2548
+ target_address_version ({utf8 , <<" /e/" , _ /binary >>}) ->
2549
+ 2 ;
2550
+ target_address_version ({utf8 , <<" /q/" , _ /binary >>}) ->
2551
+ 2 ;
2552
+ target_address_version (undefined ) ->
2553
+ % % anonymous terminus
2554
+ % % https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2555
+ 2 ;
2556
+ target_address_version (_Address ) ->
2557
+ 1 .
2560
2558
2561
2559
% % The possible v2 target address formats are:
2562
- % % /exchange /:exchange/key /:routing-key
2563
- % % /exchange /:exchange
2564
- % % /queue /:queue
2560
+ % % /e /:exchange/:routing-key
2561
+ % % /e /:exchange
2562
+ % % /q /:queue
2565
2563
% % <null>
2566
2564
ensure_target_v2 ({utf8 , String }, Vhost ) ->
2567
2565
case parse_target_v2_string (String ) of
@@ -2576,43 +2574,77 @@ ensure_target_v2({utf8, String}, Vhost) ->
2576
2574
ensure_target_v2 (undefined , _ ) ->
2577
2575
% % anonymous terminus
2578
2576
% % https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2579
- {ok , to , to , undefined };
2580
- ensure_target_v2 (Address , _ ) ->
2581
- {error , {bad_address , Address }}.
2577
+ {ok , to , to , undefined }.
2582
2578
2583
- parse_target_v2_string (<<" /exchange/" , Rest /binary >>) ->
2584
- case split_exchange_target (Rest ) of
2585
- {? DEFAULT_EXCHANGE_NAME , _ } ->
2579
+ parse_target_v2_string (<<" /e/" , Rest /binary >>) ->
2580
+ Key = cp_slash ,
2581
+ Pattern = try persistent_term :get (Key )
2582
+ catch error :badarg ->
2583
+ Cp = binary :compile_pattern (<<" /" >>),
2584
+ ok = persistent_term :put (Key , Cp ),
2585
+ Cp
2586
+ end ,
2587
+ case binary :split (Rest , Pattern , [global ]) of
2588
+ [? DEFAULT_EXCHANGE_NAME | _ ] ->
2586
2589
{error , bad_address };
2587
- { <<" amq.default" >>, _ } ->
2590
+ [ <<" amq.default" >> | _ ] ->
2588
2591
{error , bad_address };
2589
- {XNameBin , RKey } ->
2590
- {ok , XNameBin , RKey , undefined }
2592
+ [XNameBinQuoted ] ->
2593
+ XNameBin = unquote (XNameBinQuoted ),
2594
+ {ok , XNameBin , <<>>, undefined };
2595
+ [XNameBinQuoted , RKeyQuoted ] ->
2596
+ XNameBin = unquote (XNameBinQuoted ),
2597
+ RKey = unquote (RKeyQuoted ),
2598
+ {ok , XNameBin , RKey , undefined };
2599
+ _ ->
2600
+ {error , bad_address }
2591
2601
end ;
2592
- parse_target_v2_string (<<" /queue /" >>) ->
2602
+ parse_target_v2_string (<<" /q /" >>) ->
2593
2603
% % empty queue name is invalid
2594
2604
{error , bad_address };
2595
- parse_target_v2_string (<<" /queue/" , QNameBin /binary >>) ->
2605
+ parse_target_v2_string (<<" /q/" , QNameBinQuoted /binary >>) ->
2606
+ QNameBin = unquote (QNameBinQuoted ),
2596
2607
{ok , ? DEFAULT_EXCHANGE_NAME , QNameBin , QNameBin };
2597
2608
parse_target_v2_string (_ ) ->
2598
2609
{error , bad_address }.
2599
2610
2600
- % % Empty exchange name (default exchange) is valid.
2601
- split_exchange_target (Target ) ->
2602
- Key = cp_amqp_target_address ,
2603
- Pattern = try persistent_term :get (Key )
2604
- catch error :badarg ->
2605
- Cp = binary :compile_pattern (<<" /key/" >>),
2606
- ok = persistent_term :put (Key , Cp ),
2607
- Cp
2608
- end ,
2609
- case binary :split (Target , Pattern ) of
2610
- [XNameBin ] ->
2611
- {XNameBin , <<>>};
2612
- [XNameBin , RoutingKey ] ->
2613
- {XNameBin , RoutingKey }
2611
+ ensure_target_v1 ({utf8 , Address }, Vhost , User , Durable , PermCache0 ) ->
2612
+ case rabbit_routing_parser :parse_endpoint (Address , true ) of
2613
+ {ok , Dest } ->
2614
+ {QNameBin , PermCache } = ensure_terminus (
2615
+ target , Dest , Vhost , User , Durable , PermCache0 ),
2616
+ {XNameList1 , RK } = rabbit_routing_parser :parse_routing (Dest ),
2617
+ XNameBin = unicode :characters_to_binary (XNameList1 ),
2618
+ RoutingKey = case RK of
2619
+ undefined -> subject ;
2620
+ [] -> subject ;
2621
+ _ -> unicode :characters_to_binary (RK )
2622
+ end ,
2623
+ {ok , XNameBin , RoutingKey , QNameBin , PermCache };
2624
+ {error , _ } = Err ->
2625
+ Err
2626
+ end ;
2627
+ ensure_target_v1 (Address , _ , _ , _ , _ ) ->
2628
+ {error , {bad_address , Address }}.
2629
+
2630
+ % % uri_string:unquote/1 is implemented inefficiently because it always creates
2631
+ % % a new binary. We optimise for the common case: When no character is percent
2632
+ % % encoded, we avoid a new binary being created.
2633
+ unquote (Bin ) ->
2634
+ case is_quoted (Bin ) of
2635
+ true ->
2636
+ uri_string :unquote (Bin );
2637
+ false ->
2638
+ Bin
2614
2639
end .
2615
2640
2641
+ is_quoted (<<>>) ->
2642
+ false ;
2643
+ is_quoted (<<$% , _ /binary >>) ->
2644
+ true ;
2645
+ is_quoted (<<_ , Rest /binary >>) ->
2646
+ is_quoted (Rest ).
2647
+
2616
2648
handle_outgoing_mgmt_link_flow_control (
2617
2649
# management_link {delivery_count = DeliveryCountSnd } = Link0 ,
2618
2650
# 'v1_0.flow' {handle = Handle = ? UINT (HandleInt ),
@@ -3355,14 +3387,24 @@ error_not_found(Resource) ->
3355
3387
condition = ? V_1_0_AMQP_ERROR_NOT_FOUND ,
3356
3388
description = {utf8 , Description }}.
3357
3389
3358
- address_v1_permitted () ->
3359
- rabbit_deprecated_features :is_permitted (amqp_address_v1 ).
3360
-
3361
3390
-spec cap_credit (rabbit_queue_type :credit ()) ->
3362
3391
0 ..? LINK_CREDIT_RCV_FROM_QUEUE_MAX .
3363
3392
cap_credit (DesiredCredit ) ->
3364
3393
min (DesiredCredit , ? LINK_CREDIT_RCV_FROM_QUEUE_MAX ).
3365
3394
3395
+ ensure_mc_cluster_compat (Mc ) ->
3396
+ IsEnabled = rabbit_feature_flags :is_enabled (message_containers_store_amqp_v1 ),
3397
+ case IsEnabled of
3398
+ true ->
3399
+ Mc ;
3400
+ false ->
3401
+ McEnv = #{message_containers_store_amqp_v1 => IsEnabled },
3402
+ % % other nodes in the cluster may not understand the new internal
3403
+ % % amqp mc format - in this case we convert to AMQP legacy format
3404
+ % % for compatibility
3405
+ mc :convert (mc_amqpl , Mc , McEnv )
3406
+ end .
3407
+
3366
3408
format_status (
3367
3409
#{state := # state {cfg = Cfg ,
3368
3410
outgoing_pending = OutgoingPending ,
@@ -3407,16 +3449,3 @@ format_status(
3407
3449
permission_cache => PermissionCache ,
3408
3450
topic_permission_cache => TopicPermissionCache },
3409
3451
maps :update (state , State , Status ).
3410
-
3411
- ensure_mc_cluster_compat (Mc ) ->
3412
- IsEnabled = rabbit_feature_flags :is_enabled (message_containers_store_amqp_v1 ),
3413
- case IsEnabled of
3414
- true ->
3415
- Mc ;
3416
- false ->
3417
- McEnv = #{message_containers_store_amqp_v1 => IsEnabled },
3418
- % % other nodes in the cluster may not understand the new internal
3419
- % % amqp mc format - in this case we convert to AMQP legacy format
3420
- % % for compatibility
3421
- mc :convert (mc_amqpl , Mc , McEnv )
3422
- end .
0 commit comments