@@ -34,7 +34,8 @@ groups() ->
34
34
delete_queue_subscribe ,
35
35
temp_destination_queue ,
36
36
temp_destination_in_send ,
37
- blank_destination_in_send
37
+ blank_destination_in_send ,
38
+ stream_filtering
38
39
],
39
40
40
41
[{version_to_group_name (V ), [sequence ], Tests }
@@ -310,6 +311,143 @@ blank_destination_in_send(Config) ->
310
311
" Invalid destination" = proplists :get_value (" message" , Hdrs ),
311
312
ok .
312
313
314
+ stream_filtering (Config ) ->
315
+ Version = ? config (version , Config ),
316
+ Client = ? config (stomp_client , Config ),
317
+ Stream = atom_to_list (? FUNCTION_NAME ) ++ " -" ++ integer_to_list (rand :uniform (10000 )),
318
+ SubDestination = " /topic/stream-queue-test" ,
319
+ rabbit_stomp_client :send (
320
+ Client , " SUBSCRIBE" ,
321
+ [{" destination" , SubDestination },
322
+ {" receipt" , " foo" },
323
+ {" x-queue-name" , Stream },
324
+ {" x-queue-type" , " stream" },
325
+ {" durable" , " true" },
326
+ {" auto-delete" , " false" },
327
+ {" id" , " 1234" },
328
+ {" prefetch-count" , " 1" },
329
+ {" ack" , " client" }]),
330
+ {ok , Client1 , _ , _ } = stomp_receive (Client , " RECEIPT" ),
331
+ rabbit_stomp_client :send (
332
+ Client1 , " UNSUBSCRIBE" , [{" destination" , SubDestination },
333
+ {" id" , " 1234" },
334
+ {" receipt" , " bar" }]),
335
+ {ok , Client2 , _ , _ } = stomp_receive (Client1 , " RECEIPT" ),
336
+
337
+ StreamDestination = " /amq/queue/" ++ Stream ,
338
+ WaveCount = 1000 ,
339
+ Publish =
340
+ fun (C , FilterValue ) ->
341
+ lists :foldl (fun (Seq , C0 ) ->
342
+ Headers0 = [{" destination" , StreamDestination },
343
+ {" receipt" , integer_to_list (Seq )}],
344
+ Headers = case FilterValue of
345
+ undefined ->
346
+ Headers0 ;
347
+ _ ->
348
+ [{" x-stream-filter-value" , FilterValue }] ++ Headers0
349
+ end ,
350
+ rabbit_stomp_client :send (
351
+ C0 , " SEND" , Headers , [" hello" ]),
352
+ {ok , C1 , _ , _ } = stomp_receive (C0 , " RECEIPT" ),
353
+ C1
354
+ end , C , lists :seq (1 , WaveCount ))
355
+ end ,
356
+ Client3 = Publish (Client2 , " apple" ),
357
+ Client4 = Publish (Client3 , undefined ),
358
+ Client5 = Publish (Client4 , " orange" ),
359
+
360
+ % % one filter
361
+ rabbit_stomp_client :send (
362
+ Client5 , " SUBSCRIBE" ,
363
+ [{" destination" , StreamDestination },
364
+ {" id" , " 0" },
365
+ {" ack" , " client" },
366
+ {" prefetch-count" , " 1" },
367
+ {" x-stream-filter" , " apple" },
368
+ {" x-stream-offset" , " first" }]),
369
+ {Client6 , AppleMessages } = stomp_receive_messages (Client5 , Version ),
370
+ ? assert (length (AppleMessages ) < WaveCount * 3 ),
371
+ AppleFilteredMessages =
372
+ lists :filter (fun (H ) ->
373
+ proplists :get_value (" x-stream-filter-value" , H ) =:= " apple"
374
+ end , AppleMessages ),
375
+ ? assert (length (AppleFilteredMessages ) =:= WaveCount ),
376
+ rabbit_stomp_client :send (
377
+ Client6 , " UNSUBSCRIBE" , [{" destination" , StreamDestination },
378
+ {" id" , " 0" },
379
+ {" receipt" , " bar" }]),
380
+ {ok , Client7 , _ , _ } = stomp_receive (Client6 , " RECEIPT" ),
381
+
382
+ % % two filters
383
+ rabbit_stomp_client :send (
384
+ Client7 , " SUBSCRIBE" ,
385
+ [{" destination" , StreamDestination },
386
+ {" id" , " 0" },
387
+ {" ack" , " client" },
388
+ {" prefetch-count" , " 1" },
389
+ {" x-stream-filter" , " apple,orange" },
390
+ {" x-stream-offset" , " first" }]),
391
+ {Client8 , AppleOrangeMessages } = stomp_receive_messages (Client7 , Version ),
392
+ ? assert (length (AppleOrangeMessages ) < WaveCount * 3 ),
393
+ AppleOrangeFilteredMessages =
394
+ lists :filter (fun (H ) ->
395
+ proplists :get_value (" x-stream-filter-value" , H ) =:= " apple" orelse
396
+ proplists :get_value (" x-stream-filter-value" , H ) =:= " orange"
397
+ end , AppleOrangeMessages ),
398
+ ? assert (length (AppleOrangeFilteredMessages ) =:= WaveCount * 2 ),
399
+ rabbit_stomp_client :send (
400
+ Client8 , " UNSUBSCRIBE" , [{" destination" , StreamDestination },
401
+ {" id" , " 0" },
402
+ {" receipt" , " bar" }]),
403
+ {ok , Client9 , _ , _ } = stomp_receive (Client8 , " RECEIPT" ),
404
+
405
+ % % one filter and unfiltered
406
+ rabbit_stomp_client :send (
407
+ Client9 , " SUBSCRIBE" ,
408
+ [{" destination" , StreamDestination },
409
+ {" id" , " 0" },
410
+ {" ack" , " client" },
411
+ {" prefetch-count" , " 1" },
412
+ {" x-stream-filter" , " apple" },
413
+ {" x-stream-match-unfiltered" , " true" },
414
+ {" x-stream-offset" , " first" }]),
415
+ {Client10 , AppleUnfilteredMessages } = stomp_receive_messages (Client9 , Version ),
416
+ ? assert (length (AppleUnfilteredMessages ) < WaveCount * 3 ),
417
+ AppleUnfilteredFilteredMessages =
418
+ lists :filter (fun (H ) ->
419
+ proplists :get_value (" x-stream-filter-value" , H ) =:= " apple" orelse
420
+ proplists :get_value (" x-stream-filter-value" , H ) =:= undefined
421
+ end , AppleUnfilteredMessages ),
422
+ ? assert (length (AppleUnfilteredFilteredMessages ) =:= WaveCount * 2 ),
423
+ rabbit_stomp_client :send (
424
+ Client10 , " UNSUBSCRIBE" , [{" destination" , StreamDestination },
425
+ {" id" , " 0" },
426
+ {" receipt" , " bar" }]),
427
+ {ok , _ , _ , _ } = stomp_receive (Client10 , " RECEIPT" ),
428
+
429
+ Channel = ? config (amqp_channel , Config ),
430
+ # 'queue.delete_ok' {} = amqp_channel :call (Channel ,
431
+ # 'queue.delete' {queue = list_to_binary (Stream )}),
432
+ ok .
433
+
434
+ stomp_receive_messages (Client , Version ) ->
435
+ stomp_receive_messages (Client , [], Version ).
436
+
437
+ stomp_receive_messages (Client , Acc , Version ) ->
438
+ try rabbit_stomp_client :recv (Client ) of
439
+ {# stomp_frame {command = " MESSAGE" ,
440
+ headers = Headers }, Client1 } ->
441
+ MsgHeader = rabbit_stomp_util :msg_header_name (Version ),
442
+ AckValue = proplists :get_value (MsgHeader , Headers ),
443
+ AckHeader = rabbit_stomp_util :ack_header_name (Version ),
444
+ rabbit_stomp_client :send (Client1 , " ACK" , [{AckHeader , AckValue }]),
445
+ stomp_receive_messages (Client1 , [Headers ] ++ Acc , Version )
446
+ catch
447
+ error :{badmatch , {error , timeout }} ->
448
+ {Client , Acc }
449
+ end .
450
+
313
451
stomp_receive (Client , Command ) ->
314
452
{# stomp_frame {command = Command ,
315
453
headers = Hdrs ,
0 commit comments