5
5
6
6
-export ([handle_request /4 ]).
7
7
8
+ -type resource_name () :: rabbit_types :exchange_name () | rabbit_types :rabbit_amqqueue_name ().
9
+
8
10
-spec handle_request (binary (), rabbit_types :vhost (), rabbit_types :user (), pid ()) -> iolist ().
9
11
handle_request (Request , Vhost , User , ConnectionPid ) ->
10
12
ReqSections = amqp10_framing :decode_bin (Request ),
@@ -31,7 +33,7 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
31
33
Vhost ,
32
34
User ,
33
35
ConnectionPid )
34
- catch throw :{StatusCode0 , Explanation } ->
36
+ catch throw :{? MODULE , StatusCode0 , Explanation } ->
35
37
{StatusCode0 , [], {utf8 , unicode :characters_to_binary (Explanation )}}
36
38
end ,
37
39
@@ -52,7 +54,7 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
52
54
% % If queue with same fields already exists, return 200 including the queue content.
53
55
% % If queue / exchange with other fields exists, return 409 with explanation about which fields diff.
54
56
handle_http_req (<<" PUT" >>,
55
- [<<" queues" >>, QNameBinQ ],
57
+ [<<" queues" >>, QNameBinQuoted ],
56
58
_Query ,
57
59
ReqPayload ,
58
60
Vhost ,
@@ -64,7 +66,7 @@ handle_http_req(<<"PUT">>,
64
66
arguments := QArgs
65
67
} = decode_queue (ReqPayload ),
66
68
QType = rabbit_amqqueue :get_queue_type (QArgs ),
67
- QNameBin = uri_string :unquote (QNameBinQ ),
69
+ QNameBin = uri_string :unquote (QNameBinQuoted ),
68
70
QName = rabbit_misc :r (Vhost , queue , QNameBin ),
69
71
Owner = case Exclusive of
70
72
true -> ConnPid ;
@@ -76,38 +78,55 @@ handle_http_req(<<"PUT">>,
76
78
{<<" 201" >>, [], null };
77
79
78
80
handle_http_req (<<" PUT" >>,
79
- [<<" exchanges" >>, XNameBinQ ],
81
+ [<<" exchanges" >>, XNameBinQuoted ],
80
82
_Query ,
81
83
ReqPayload ,
82
84
Vhost ,
83
- # user {username = Username },
85
+ User = # user {username = Username },
84
86
_ConnPid ) ->
85
- #{type := XType ,
87
+ XNameBin = uri_string :unquote (XNameBinQuoted ),
88
+ #{type := XTypeBin ,
86
89
durable := Durable ,
87
90
auto_delete := AutoDelete ,
88
91
internal := Internal ,
89
92
arguments := XArgs
90
93
} = decode_exchange (ReqPayload ),
91
- XNameBin = uri_string :unquote (XNameBinQ ),
94
+ XTypeAtom = try rabbit_exchange :check_type (XTypeBin )
95
+ catch exit :# amqp_error {explanation = Explanation } ->
96
+ throw (<<" 400" >>, Explanation , [])
97
+ end ,
98
+ ok = prohibit_default_exchange (XNameBin ),
92
99
XName = rabbit_misc :r (Vhost , exchange , XNameBin ),
93
- CheckedXType = rabbit_exchange :check_type (XType ),
94
- _X = rabbit_exchange :declare (XName ,
95
- CheckedXType ,
96
- Durable ,
97
- AutoDelete ,
98
- Internal ,
99
- XArgs ,
100
- Username ),
101
- {<<" 201" >>, [], null };
100
+ ok = check_resource_access (XName , configure , User ),
101
+ {StatCode , X } = case rabbit_exchange :lookup (XName ) of
102
+ {ok , FoundX } ->
103
+ {<<" 200" >>, FoundX };
104
+ {error , not_found } ->
105
+ ok = prohibit_cr_lf (XNameBin ),
106
+ ok = prohibit_reserved_amq (XName ),
107
+ X0 = rabbit_exchange :declare (
108
+ XName , XTypeAtom , Durable , AutoDelete ,
109
+ Internal , XArgs , Username ),
110
+ {<<" 201" >>, X0 }
111
+ end ,
112
+ try rabbit_exchange :assert_equivalence (
113
+ X , XTypeAtom , Durable , AutoDelete , Internal , XArgs ) of
114
+ ok ->
115
+ % %TODO Include the exchange in the response payload
116
+ {StatCode , [], null }
117
+ catch exit :# amqp_error {name = precondition_failed ,
118
+ explanation = Expl } ->
119
+ throw (<<" 409" >>, Expl , [])
120
+ end ;
102
121
103
122
handle_http_req (<<" DELETE" >>,
104
- [<<" queues" >>, QNameBinQ , <<" messages" >>],
123
+ [<<" queues" >>, QNameBinQuoted , <<" messages" >>],
105
124
_Query ,
106
125
null ,
107
126
Vhost ,
108
127
_User ,
109
128
ConnPid ) ->
110
- QNameBin = uri_string :unquote (QNameBinQ ),
129
+ QNameBin = uri_string :unquote (QNameBinQuoted ),
111
130
QName = rabbit_misc :r (Vhost , queue , QNameBin ),
112
131
{ok , NumMsgs } = rabbit_amqqueue :with_exclusive_access_or_die (
113
132
QName , ConnPid ,
@@ -118,13 +137,13 @@ handle_http_req(<<"DELETE">>,
118
137
{<<" 200" >>, [], RespPayload };
119
138
120
139
handle_http_req (<<" DELETE" >>,
121
- [<<" queues" >>, QNameBinQ ],
140
+ [<<" queues" >>, QNameBinQuoted ],
122
141
_Query ,
123
142
null ,
124
143
Vhost ,
125
144
# user {username = Username },
126
145
ConnPid ) ->
127
- QNameBin = uri_string :unquote (QNameBinQ ),
146
+ QNameBin = uri_string :unquote (QNameBinQuoted ),
128
147
QName = rabbit_misc :r (Vhost , queue , QNameBin ),
129
148
{ok , NumMsgs } = rabbit_amqqueue :delete_with (QName , ConnPid , false , false , Username , true ),
130
149
RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
@@ -414,8 +433,43 @@ args_hash(Args)
414
433
base64 :encode (Bin , #{mode => urlsafe ,
415
434
padding => false }).
416
435
436
+ prohibit_cr_lf (NameBin ) ->
437
+ case binary :match (NameBin , [<<" \n " >>, <<" \r " >>]) of
438
+ nomatch ->
439
+ ok ;
440
+ _Found ->
441
+ throw (<<" 400" >>, <<" Bad name '~ts ': \n and \r not allowed" >>, [NameBin ])
442
+ end .
443
+
444
+ prohibit_default_exchange (<<>>) ->
445
+ throw (<<" 403" >>, <<" operation not permitted on the default exchange" >>, []);
446
+ prohibit_default_exchange (_ ) ->
447
+ ok .
448
+
449
+ -spec prohibit_reserved_amq (resource_name ()) -> ok .
450
+ prohibit_reserved_amq (Res = # resource {name = <<" amq." , _ /binary >>}) ->
451
+ throw (<<" 403" >>,
452
+ " ~ts starts with reserved prefix 'amq.'" ,
453
+ [rabbit_misc :rs (Res )]);
454
+ prohibit_reserved_amq (# resource {}) ->
455
+ ok .
456
+
457
+ -spec check_resource_access (resource_name (),
458
+ rabbit_types :permission_atom (),
459
+ rabbit_types :user ()) -> ok .
460
+ check_resource_access (Resource , Perm , User ) ->
461
+ try rabbit_access_control :check_resource_access (User , Resource , Perm , #{})
462
+ catch exit :# amqp_error {name = not_allowed } ->
463
+ % % For authorization failures, let's be more strict: Close the entire
464
+ % % AMQP session instead of only returning a HTTP Status Code 403.
465
+ rabbit_amqp_util :protocol_error (
466
+ ? V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS ,
467
+ " ~s access refused for user '~ts ' to ~ts " ,
468
+ [Perm , User , rabbit_misc :rs (Resource )])
469
+ end .
470
+
417
471
-spec throw (binary (), io :format (), [term ()]) -> no_return ().
418
472
throw (StatusCode , Format , Data ) ->
419
473
Explanation = lists :flatten (io_lib :format (Format , Data )),
420
474
rabbit_log :warning (Explanation ),
421
- throw ({StatusCode , Explanation }).
475
+ throw ({? MODULE , StatusCode , Explanation }).
0 commit comments