@@ -10,32 +10,39 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
10
10
ReqSections = amqp10_framing :decode_bin (Request ),
11
11
? DEBUG (" ~s Inbound request:~n ~tp " ,
12
12
[? MODULE , [amqp10_framing :pprint (Section ) || Section <- ReqSections ]]),
13
+
13
14
{# 'v1_0.properties' {
14
15
message_id = MessageId ,
15
16
to = {utf8 , HttpRequestTarget },
16
17
subject = {utf8 , HttpMethod },
17
18
% % see Link Pair CS 01 §2.1
18
19
% % https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331305
19
20
reply_to = {utf8 , <<" $me" >>}},
20
- ReqPayload
21
+ ReqBody
21
22
} = decode_req (ReqSections , {undefined , undefined }),
22
- {PathSegments , QueryMap } = parse_uri (HttpRequestTarget ),
23
- {RespProps0 ,
24
- RespAppProps0 = # 'v1_0.application_properties' {content = C },
25
- RespBody } = handle_http_req (HttpMethod ,
26
- PathSegments ,
27
- QueryMap ,
28
- ReqPayload ,
29
- Vhost ,
30
- User ,
31
- ConnectionPid ),
32
- RespProps = RespProps0 # 'v1_0.properties' {
33
- % % "To associate a response with a request, the correlation-id value of the response
34
- % % properties MUST be set to the message-id value of the request properties."
35
- % % [HTTP over AMQP WD 06 §5.1]
36
- correlation_id = MessageId },
37
- RespAppProps = RespAppProps0 # 'v1_0.application_properties' {
38
- content = [{{utf8 , <<" http:response" >>}, {utf8 , <<" 1.1" >>}} | C ]},
23
+
24
+ {StatusCode ,
25
+ RespAppProps0 ,
26
+ RespBody } = try {PathSegments , QueryMap } = parse_uri (HttpRequestTarget ),
27
+ handle_http_req (HttpMethod ,
28
+ PathSegments ,
29
+ QueryMap ,
30
+ ReqBody ,
31
+ Vhost ,
32
+ User ,
33
+ ConnectionPid )
34
+ catch throw :{StatusCode0 , Explanation } ->
35
+ {StatusCode0 , [], {utf8 , unicode :characters_to_binary (Explanation )}}
36
+ end ,
37
+
38
+ RespProps = # 'v1_0.properties' {
39
+ subject = {utf8 , StatusCode },
40
+ % % "To associate a response with a request, the correlation-id value of the response
41
+ % % properties MUST be set to the message-id value of the request properties."
42
+ % % [HTTP over AMQP WD 06 §5.1]
43
+ correlation_id = MessageId },
44
+ RespAppProps = # 'v1_0.application_properties' {
45
+ content = [{{utf8 , <<" http:response" >>}, {utf8 , <<" 1.1" >>}} | RespAppProps0 ]},
39
46
RespDataSect = # 'v1_0.amqp_value' {content = RespBody },
40
47
RespSections = [RespProps , RespAppProps , RespDataSect ],
41
48
[amqp10_framing :encode_bin (Sect ) || Sect <- RespSections ].
@@ -66,10 +73,7 @@ handle_http_req(<<"PUT">>,
66
73
Q0 = amqqueue :new (QName , none , Durable , AutoDelete , Owner ,
67
74
QArgs , Vhost , #{user => Username }, QType ),
68
75
{new , _Q } = rabbit_queue_type :declare (Q0 , node ()),
69
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 201" >>}},
70
- AppProps = # 'v1_0.application_properties' {content = []},
71
- RespPayload = null ,
72
- {Props , AppProps , RespPayload };
76
+ {<<" 201" >>, [], null };
73
77
74
78
handle_http_req (<<" PUT" >>,
75
79
[<<" exchanges" >>, XNameBinQ ],
@@ -94,10 +98,7 @@ handle_http_req(<<"PUT">>,
94
98
Internal ,
95
99
XArgs ,
96
100
Username ),
97
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 201" >>} },
98
- AppProps = # 'v1_0.application_properties' {content = []},
99
- RespPayload = null ,
100
- {Props , AppProps , RespPayload };
101
+ {<<" 201" >>, [], null };
101
102
102
103
handle_http_req (<<" DELETE" >>,
103
104
[<<" queues" >>, QNameBinQ , <<" messages" >>],
@@ -113,10 +114,8 @@ handle_http_req(<<"DELETE">>,
113
114
fun (Q ) ->
114
115
rabbit_queue_type :purge (Q )
115
116
end ),
116
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 200" >>}},
117
- AppProps = # 'v1_0.application_properties' {content = []},
118
117
RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
119
- {Props , AppProps , RespPayload };
118
+ {<< " 200 " >>, [] , RespPayload };
120
119
121
120
handle_http_req (<<" DELETE" >>,
122
121
[<<" queues" >>, QNameBinQ ],
@@ -128,10 +127,8 @@ handle_http_req(<<"DELETE">>,
128
127
QNameBin = uri_string :unquote (QNameBinQ ),
129
128
QName = rabbit_misc :r (Vhost , queue , QNameBin ),
130
129
{ok , NumMsgs } = rabbit_amqqueue :delete_with (QName , ConnPid , false , false , Username , true ),
131
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 200" >>}},
132
- AppProps = # 'v1_0.application_properties' {content = []},
133
130
RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
134
- {Props , AppProps , RespPayload };
131
+ {<< " 200 " >>, [] , RespPayload };
135
132
136
133
handle_http_req (<<" DELETE" >>,
137
134
[<<" exchanges" >>, XNameBinQ ],
@@ -150,10 +147,7 @@ handle_http_req(<<"DELETE">>,
150
147
% % %% TODO return deletion failure
151
148
% % {error, in_use} ->
152
149
end ,
153
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 204" >>}},
154
- AppProps = # 'v1_0.application_properties' {content = []},
155
- RespPayload = null ,
156
- {Props , AppProps , RespPayload };
150
+ {<<" 204" >>, [], null };
157
151
158
152
handle_http_req (<<" POST" >>,
159
153
[<<" bindings" >>],
@@ -179,12 +173,9 @@ handle_http_req(<<"POST">>,
179
173
args = Args },
180
174
% %TODO If the binding already exists, return 303 with location.
181
175
ok = rabbit_binding :add (Binding , Username ),
182
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 201" >>}},
183
176
Location = compose_binding_uri (SrcXNameBin , DstKind , DstNameBin , BindingKey , Args ),
184
- AppProps = # 'v1_0.application_properties' {
185
- content = [{{utf8 , <<" location" >>}, {utf8 , Location }}]},
186
- RespPayload = null ,
187
- {Props , AppProps , RespPayload };
177
+ AppProps = [{{utf8 , <<" location" >>}, {utf8 , Location }}],
178
+ {<<" 201" >>, AppProps , null };
188
179
189
180
handle_http_req (<<" DELETE" >>,
190
181
[<<" bindings" >>, BindingSegment ],
@@ -203,10 +194,7 @@ handle_http_req(<<"DELETE">>,
203
194
false ->
204
195
ok
205
196
end ,
206
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 204" >>}},
207
- AppProps = # 'v1_0.application_properties' {content = []},
208
- RespPayload = null ,
209
- {Props , AppProps , RespPayload };
197
+ {<<" 204" >>, [], null };
210
198
211
199
handle_http_req (<<" GET" >>,
212
200
[<<" bindings" >>],
@@ -222,55 +210,79 @@ handle_http_req(<<"GET">>,
222
210
#{<<" dstq" >> := DstQ } ->
223
211
{queue , DstQ };
224
212
_ ->
225
- % %TODO return 400
226
- exit ({bad_destination , QueryMap })
213
+ throw (<<" 400" >>, " missing 'dste' or 'dstq' in query: ~tp " , QueryMap )
227
214
end ,
228
215
SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
229
216
DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
230
217
Bindings0 = rabbit_binding :list_for_source_and_destination (SrcXName , DstName ),
231
218
Bindings = [B || B = # binding {key = K } <- Bindings0 , K =:= Key ],
232
219
RespPayload = encode_bindings (Bindings ),
233
- Props = # 'v1_0.properties' {subject = {utf8 , <<" 200" >>}},
234
- AppProps = # 'v1_0.application_properties' {content = []},
235
- {Props , AppProps , RespPayload }.
220
+ {<<" 200" >>, [], RespPayload }.
236
221
237
222
decode_queue ({map , KVList }) ->
238
- lists :foldl (
239
- fun ({{utf8 , <<" durable" >>}, V }, Acc )
240
- when is_boolean (V ) ->
241
- Acc #{durable => V };
242
- ({{utf8 , <<" exclusive" >>}, V }, Acc )
243
- when is_boolean (V ) ->
244
- Acc #{exclusive => V };
245
- ({{utf8 , <<" auto_delete" >>}, V }, Acc )
246
- when is_boolean (V ) ->
247
- Acc #{auto_delete => V };
248
- ({{utf8 , <<" arguments" >>}, {map , List }}, Acc ) ->
249
- Args = [{Key , longstr , V }
250
- || {{utf8 , Key = <<" x-" , _ /binary >>},
251
- {utf8 , V }} <- List ],
252
- Acc #{arguments => Args }
253
- end , #{}, KVList ).
223
+ M = lists :foldl (
224
+ fun ({{utf8 , <<" durable" >>}, V }, Acc )
225
+ when is_boolean (V ) ->
226
+ Acc #{durable => V };
227
+ ({{utf8 , <<" exclusive" >>}, V }, Acc )
228
+ when is_boolean (V ) ->
229
+ Acc #{exclusive => V };
230
+ ({{utf8 , <<" auto_delete" >>}, V }, Acc )
231
+ when is_boolean (V ) ->
232
+ Acc #{auto_delete => V };
233
+ ({{utf8 , <<" arguments" >>}, {map , List }}, Acc ) ->
234
+ Args = lists :map (fun ({{utf8 , Key = <<" x-" , _ /binary >>}, {Type , Val }})
235
+ when Type =:= utf8 orelse
236
+ Type =:= symbol ->
237
+ {Key , longstr , Val };
238
+ (Arg ) ->
239
+ throw (<<" 400" >>,
240
+ " unsupported queue argument ~tp " ,
241
+ [Arg ])
242
+ end , List ),
243
+ Acc #{arguments => Args };
244
+ (Prop , _Acc ) ->
245
+ throw (<<" 400" >>, " bad queue property ~tp " , [Prop ])
246
+ end , #{}, KVList ),
247
+ Defaults = #{durable => true ,
248
+ exclusive => false ,
249
+ auto_delete => false ,
250
+ arguments => []},
251
+ maps :merge (Defaults , M ).
254
252
255
253
decode_exchange ({map , KVList }) ->
256
- lists :foldl (
257
- fun ({{utf8 , <<" durable" >>}, V }, Acc )
258
- when is_boolean (V ) ->
259
- Acc #{durable => V };
260
- ({{utf8 , <<" auto_delete" >>}, V }, Acc )
261
- when is_boolean (V ) ->
262
- Acc #{auto_delete => V };
263
- ({{utf8 , <<" type" >>}, {utf8 , V }}, Acc ) ->
264
- Acc #{type => V };
265
- ({{utf8 , <<" internal" >>}, V }, Acc )
266
- when is_boolean (V ) ->
267
- Acc #{internal => V };
268
- ({{utf8 , <<" arguments" >>}, {map , List }}, Acc ) ->
269
- Args = [{Key , longstr , V }
270
- || {{utf8 , Key = <<" x-" , _ /binary >>},
271
- {utf8 , V }} <- List ],
272
- Acc #{arguments => Args }
273
- end , #{}, KVList ).
254
+ M = lists :foldl (
255
+ fun ({{utf8 , <<" durable" >>}, V }, Acc )
256
+ when is_boolean (V ) ->
257
+ Acc #{durable => V };
258
+ ({{utf8 , <<" auto_delete" >>}, V }, Acc )
259
+ when is_boolean (V ) ->
260
+ Acc #{auto_delete => V };
261
+ ({{utf8 , <<" type" >>}, {utf8 , V }}, Acc ) ->
262
+ Acc #{type => V };
263
+ ({{utf8 , <<" internal" >>}, V }, Acc )
264
+ when is_boolean (V ) ->
265
+ Acc #{internal => V };
266
+ ({{utf8 , <<" arguments" >>}, {map , List }}, Acc ) ->
267
+ Args = lists :map (fun ({{utf8 , Key = <<" x-" , _ /binary >>}, {Type , Val }})
268
+ when Type =:= utf8 orelse
269
+ Type =:= symbol ->
270
+ {Key , longstr , Val };
271
+ (Arg ) ->
272
+ throw (<<" 400" >>,
273
+ " unsupported exchange argument ~tp " ,
274
+ [Arg ])
275
+ end , List ),
276
+ Acc #{arguments => Args };
277
+ (Prop , _Acc ) ->
278
+ throw (<<" 400" >>, " bad exchange property ~tp " , [Prop ])
279
+ end , #{}, KVList ),
280
+ Defaults = #{durable => true ,
281
+ auto_delete => false ,
282
+ type => <<" direct" >>,
283
+ internal => false ,
284
+ arguments => []},
285
+ maps :merge (Defaults , M ).
274
286
275
287
decode_binding ({map , KVList }) ->
276
288
lists :foldl (
@@ -283,9 +295,18 @@ decode_binding({map, KVList}) ->
283
295
({{utf8 , <<" binding_key" >>}, {utf8 , V }}, Acc ) ->
284
296
Acc #{binding_key => V };
285
297
({{utf8 , <<" arguments" >>}, {map , List }}, Acc ) ->
286
- Args = [mc_amqpl :to_091 (Key , TypeVal )
287
- || {{utf8 , Key }, TypeVal } <- List ],
288
- Acc #{arguments => Args }
298
+ Args = lists :map (fun ({{T , Key }, TypeVal })
299
+ when T =:= utf8 orelse
300
+ T =:= symbol ->
301
+ mc_amqpl :to_091 (Key , TypeVal );
302
+ (Arg ) ->
303
+ throw (<<" 400" >>,
304
+ " unsupported binding argument ~tp " ,
305
+ [Arg ])
306
+ end , List ),
307
+ Acc #{arguments => Args };
308
+ (Field , _Acc ) ->
309
+ throw (<<" 400" >>, " bad binding field ~tp " , [Field ])
289
310
end , #{}, KVList ).
290
311
291
312
encode_bindings (Bindings ) ->
@@ -324,21 +345,29 @@ decode_req([_IgnoreSection | Rem], Acc) ->
324
345
decode_req (Rem , Acc ).
325
346
326
347
parse_uri (Uri ) ->
327
- UriMap = #{path := Path } = uri_string :normalize (Uri , [return_map ]),
328
- [<<>> | Segments ] = binary :split (Path , <<" /" >>, [global ]),
329
- QueryMap = case maps :find (query , UriMap ) of
330
- {ok , Query } ->
331
- case uri_string :dissect_query (Query ) of
332
- {error , _ , _ } = Err ->
333
- % %TODO return 400
334
- exit (Err );
335
- QueryList ->
336
- maps :from_list (QueryList )
337
- end ;
338
- error ->
339
- #{}
340
- end ,
341
- {Segments , QueryMap }.
348
+ case uri_string :normalize (Uri , [return_map ]) of
349
+ UriMap = #{path := Path } ->
350
+ [<<>> | Segments ] = binary :split (Path , <<" /" >>, [global ]),
351
+ QueryMap = case maps :find (query , UriMap ) of
352
+ {ok , Query } ->
353
+ case uri_string :dissect_query (Query ) of
354
+ QueryList
355
+ when is_list (QueryList ) ->
356
+ maps :from_list (QueryList );
357
+ {error , Atom , Term } ->
358
+ throw (<<" 400" >>,
359
+ " failed to dissect query '~ts ': ~s ~tp " ,
360
+ [Query , Atom , Term ])
361
+ end ;
362
+ error ->
363
+ #{}
364
+ end ,
365
+ {Segments , QueryMap };
366
+ {error , Atom , Term } ->
367
+ throw (<<" 400" >>,
368
+ " failed to normalize URI '~ts ': ~s ~tp " ,
369
+ [Uri , Atom , Term ])
370
+ end .
342
371
343
372
compose_binding_uri (Src , DstKind , Dst , Key , Args ) ->
344
373
SrcQ = uri_string :quote (Src ),
@@ -384,3 +413,9 @@ args_hash(Args)
384
413
Bin = <<(erlang :phash2 (Args , 1 bsl 32 )):32 >>,
385
414
base64 :encode (Bin , #{mode => urlsafe ,
386
415
padding => false }).
416
+
417
+ -spec throw (binary (), io :format (), [term ()]) -> no_return ().
418
+ throw (StatusCode , Format , Data ) ->
419
+ Explanation = lists :flatten (io_lib :format (Format , Data )),
420
+ rabbit_log :warning (Explanation ),
421
+ throw ({StatusCode , Explanation }).
0 commit comments