@@ -24,7 +24,6 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
24
24
} = decode_req (ReqSections , {undefined , undefined }),
25
25
26
26
{StatusCode ,
27
- RespAppProps0 ,
28
27
RespBody } = try {PathSegments , QueryMap } = parse_uri (HttpRequestTarget ),
29
28
handle_http_req (HttpMethod ,
30
29
PathSegments ,
@@ -34,7 +33,9 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
34
33
User ,
35
34
ConnectionPid )
36
35
catch throw :{? MODULE , StatusCode0 , Explanation } ->
37
- {StatusCode0 , [], {utf8 , unicode :characters_to_binary (Explanation )}}
36
+ rabbit_log :warning (" request ~ts ~ts failed: ~ts " ,
37
+ [HttpMethod , HttpRequestTarget , Explanation ]),
38
+ {StatusCode0 , {utf8 , Explanation }}
38
39
end ,
39
40
40
41
RespProps = # 'v1_0.properties' {
@@ -44,7 +45,9 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
44
45
% % [HTTP over AMQP WD 06 §5.1]
45
46
correlation_id = MessageId },
46
47
RespAppProps = # 'v1_0.application_properties' {
47
- content = [{{utf8 , <<" http:response" >>}, {utf8 , <<" 1.1" >>}} | RespAppProps0 ]},
48
+ content = [
49
+ {{utf8 , <<" http:response" >>}, {utf8 , <<" 1.1" >>}}
50
+ ]},
48
51
RespDataSect = # 'v1_0.amqp_value' {content = RespBody },
49
52
RespSections = [RespProps , RespAppProps , RespDataSect ],
50
53
[amqp10_framing :encode_bin (Sect ) || Sect <- RespSections ].
@@ -75,7 +78,7 @@ handle_http_req(<<"PUT">>,
75
78
Q0 = amqqueue :new (QName , none , Durable , AutoDelete , Owner ,
76
79
QArgs , Vhost , #{user => Username }, QType ),
77
80
{new , _Q } = rabbit_queue_type :declare (Q0 , node ()),
78
- {<<" 201" >>, [], null };
81
+ {<<" 201" >>, null };
79
82
80
83
handle_http_req (<<" PUT" >>,
81
84
[<<" exchanges" >>, XNameBinQuoted ],
@@ -95,8 +98,8 @@ handle_http_req(<<"PUT">>,
95
98
catch exit :# amqp_error {explanation = Explanation } ->
96
99
throw (<<" 400" >>, Explanation , [])
97
100
end ,
98
- ok = prohibit_default_exchange (XNameBin ),
99
101
XName = rabbit_misc :r (Vhost , exchange , XNameBin ),
102
+ ok = prohibit_default_exchange (XName ),
100
103
ok = check_resource_access (XName , configure , User ),
101
104
X = case rabbit_exchange :lookup (XName ) of
102
105
{ok , FoundX } ->
@@ -111,7 +114,7 @@ handle_http_req(<<"PUT">>,
111
114
try rabbit_exchange :assert_equivalence (
112
115
X , XTypeAtom , Durable , AutoDelete , Internal , XArgs ) of
113
116
ok ->
114
- {<<" 204" >>, [], null }
117
+ {<<" 204" >>, null }
115
118
catch exit :# amqp_error {name = precondition_failed ,
116
119
explanation = Expl } ->
117
120
throw (<<" 409" >>, Expl , [])
@@ -132,7 +135,7 @@ handle_http_req(<<"DELETE">>,
132
135
rabbit_queue_type :purge (Q )
133
136
end ),
134
137
RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
135
- {<<" 200" >>, [], RespPayload };
138
+ {<<" 200" >>, RespPayload };
136
139
137
140
handle_http_req (<<" DELETE" >>,
138
141
[<<" queues" >>, QNameBinQuoted ],
@@ -145,7 +148,7 @@ handle_http_req(<<"DELETE">>,
145
148
QName = rabbit_misc :r (Vhost , queue , QNameBin ),
146
149
{ok , NumMsgs } = rabbit_amqqueue :delete_with (QName , ConnPid , false , false , Username , true ),
147
150
RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
148
- {<<" 200" >>, [], RespPayload };
151
+ {<<" 200" >>, RespPayload };
149
152
150
153
handle_http_req (<<" DELETE" >>,
151
154
[<<" exchanges" >>, XNameBinQuoted ],
@@ -157,19 +160,19 @@ handle_http_req(<<"DELETE">>,
157
160
XNameBin = uri_string :unquote (XNameBinQuoted ),
158
161
XName = rabbit_misc :r (Vhost , exchange , XNameBin ),
159
162
ok = prohibit_cr_lf (XNameBin ),
160
- ok = prohibit_default_exchange (XNameBin ),
163
+ ok = prohibit_default_exchange (XName ),
161
164
ok = prohibit_reserved_amq (XName ),
162
165
ok = check_resource_access (XName , configure , User ),
163
166
_ = rabbit_exchange :delete (XName , false , Username ),
164
- {<<" 204" >>, [], null };
167
+ {<<" 204" >>, null };
165
168
166
169
handle_http_req (<<" POST" >>,
167
170
[<<" bindings" >>],
168
171
_Query ,
169
172
ReqPayload ,
170
173
Vhost ,
171
- # user {username = Username },
172
- _ConnPid ) ->
174
+ User = # user {username = Username },
175
+ ConnPid ) ->
173
176
#{source := SrcXNameBin ,
174
177
binding_key := BindingKey ,
175
178
arguments := Args } = BindingMap = decode_binding (ReqPayload ),
@@ -181,34 +184,37 @@ handle_http_req(<<"POST">>,
181
184
end ,
182
185
SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
183
186
DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
187
+ ok = binding_checks (SrcXName , DstName , BindingKey , User ),
184
188
Binding = # binding {source = SrcXName ,
185
189
destination = DstName ,
186
190
key = BindingKey ,
187
191
args = Args },
188
- % %TODO If the binding already exists, return 303 with location.
189
- ok = rabbit_binding :add (Binding , Username ),
190
- Location = compose_binding_uri (SrcXNameBin , DstKind , DstNameBin , BindingKey , Args ),
191
- AppProps = [{{utf8 , <<" location" >>}, {utf8 , Location }}],
192
- {<<" 201" >>, AppProps , null };
192
+ ok = binding_action (add , Binding , Username , ConnPid ),
193
+ {<<" 204" >>, null };
193
194
194
195
handle_http_req (<<" DELETE" >>,
195
196
[<<" bindings" >>, BindingSegment ],
196
197
_Query ,
197
198
null ,
198
199
Vhost ,
199
- # user {username = Username },
200
- _ConnPid ) ->
201
- {SrcXNameBin , DstKind , DstNameBin , BindingKey , ArgsHash } = decode_binding_path_segment (BindingSegment ),
200
+ User = # user {username = Username },
201
+ ConnPid ) ->
202
+ {SrcXNameBin ,
203
+ DstKind ,
204
+ DstNameBin ,
205
+ BindingKey ,
206
+ ArgsHash } = decode_binding_path_segment (BindingSegment ),
202
207
SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
203
208
DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
209
+ ok = binding_checks (SrcXName , DstName , BindingKey , User ),
204
210
Bindings = rabbit_binding :list_for_source_and_destination (SrcXName , DstName ),
205
211
case search_binding (BindingKey , ArgsHash , Bindings ) of
206
212
{value , Binding } ->
207
- ok = rabbit_binding : remove ( Binding , Username );
213
+ ok = binding_action ( remove , Binding , Username , ConnPid );
208
214
false ->
209
215
ok
210
216
end ,
211
- {<<" 204" >>, [], null };
217
+ {<<" 204" >>, null };
212
218
213
219
handle_http_req (<<" GET" >>,
214
220
[<<" bindings" >>],
@@ -218,20 +224,23 @@ handle_http_req(<<"GET">>,
218
224
Vhost ,
219
225
_User ,
220
226
_ConnPid ) ->
221
- {DstKind , DstNameBin } = case QueryMap of
222
- #{<<" dste" >> := DstX } ->
223
- {exchange , DstX };
224
- #{<<" dstq" >> := DstQ } ->
225
- {queue , DstQ };
226
- _ ->
227
- throw (<<" 400" >>, " missing 'dste' or 'dstq' in query: ~tp " , QueryMap )
228
- end ,
227
+ {DstKind ,
228
+ DstNameBin } = case QueryMap of
229
+ #{<<" dste" >> := DstX } ->
230
+ {exchange , DstX };
231
+ #{<<" dstq" >> := DstQ } ->
232
+ {queue , DstQ };
233
+ _ ->
234
+ throw (<<" 400" >>,
235
+ " missing 'dste' or 'dstq' in query: ~tp " ,
236
+ QueryMap )
237
+ end ,
229
238
SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
230
239
DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
231
240
Bindings0 = rabbit_binding :list_for_source_and_destination (SrcXName , DstName ),
232
241
Bindings = [B || B = # binding {key = K } <- Bindings0 , K =:= Key ],
233
242
RespPayload = encode_bindings (Bindings ),
234
- {<<" 200" >>, [], RespPayload }.
243
+ {<<" 200" >>, RespPayload }.
235
244
236
245
decode_queue ({map , KVList }) ->
237
246
M = lists :foldl (
@@ -424,19 +433,47 @@ args_hash([]) ->
424
433
<<>>;
425
434
args_hash (Args )
426
435
when is_list (Args ) ->
436
+ % % Args is already sorted.
427
437
Bin = <<(erlang :phash2 (Args , 1 bsl 32 )):32 >>,
428
438
base64 :encode (Bin , #{mode => urlsafe ,
429
439
padding => false }).
430
440
441
+ -spec binding_checks (rabbit_types :exchange_name (),
442
+ resource_name (),
443
+ rabbit_types :binding_key (),
444
+ rabbit_types :user ()) -> ok .
445
+ binding_checks (SrcXName , DstName , BindingKey , User ) ->
446
+ lists :foreach (fun (# resource {name = NameBin } = Name ) ->
447
+ ok = prohibit_default_exchange (Name ),
448
+ ok = prohibit_cr_lf (NameBin )
449
+ end , [SrcXName , DstName ]),
450
+ ok = check_resource_access (DstName , write , User ),
451
+ ok = check_resource_access (SrcXName , read , User ),
452
+ case rabbit_exchange :lookup (SrcXName ) of
453
+ {ok , SrcX } ->
454
+ rabbit_amqp_session :check_read_permitted_on_topic (SrcX , User , BindingKey );
455
+ {error , not_found } ->
456
+ ok
457
+ end .
458
+
459
+ binding_action (Action , Binding , Username , ConnPid ) ->
460
+ try rabbit_channel :binding_action (Action , Binding , Username , ConnPid )
461
+ catch exit :# amqp_error {explanation = Explanation } ->
462
+ throw (<<" 400" >>, Explanation , [])
463
+ end .
464
+
431
465
prohibit_cr_lf (NameBin ) ->
432
466
case binary :match (NameBin , [<<" \n " >>, <<" \r " >>]) of
433
467
nomatch ->
434
468
ok ;
435
469
_Found ->
436
- throw (<<" 400" >>, <<" Bad name '~ts ': \n and \r not allowed" >>, [NameBin ])
470
+ throw (<<" 400" >>,
471
+ <<" Bad name '~ts ': line feed and carriage return characters not allowed" >>,
472
+ [NameBin ])
437
473
end .
438
474
439
- prohibit_default_exchange (<<>>) ->
475
+ prohibit_default_exchange (# resource {kind = exchange ,
476
+ name = <<" " >>}) ->
440
477
throw (<<" 403" >>, <<" operation not permitted on the default exchange" >>, []);
441
478
prohibit_default_exchange (_ ) ->
442
479
ok .
@@ -464,6 +501,6 @@ check_resource_access(Resource, Perm, User) ->
464
501
465
502
-spec throw (binary (), io :format (), [term ()]) -> no_return ().
466
503
throw (StatusCode , Format , Data ) ->
467
- Explanation = lists :flatten (io_lib :format (Format , Data )),
468
- rabbit_log : warning ( Explanation ),
469
- throw ({? MODULE , StatusCode , Explanation }).
504
+ Reason0 = lists :flatten (io_lib :format (Format , Data )),
505
+ Reason = unicode : characters_to_binary ( Reason0 ),
506
+ throw ({? MODULE , StatusCode , Reason }).
0 commit comments