9
9
10
10
-include_lib (" common_test/include/ct.hrl" ).
11
11
-include_lib (" amqp_client/include/amqp_client.hrl" ).
12
-
12
+ -include_lib (" stdlib/include/assert.hrl" ).
13
+ -include_lib (" rabbitmq_ct_helpers/include/rabbit_assert.hrl" ).
13
14
14
15
-compile (export_all ).
15
16
@@ -25,7 +26,17 @@ groups() ->
25
26
policy_ttl ,
26
27
operator_policy_ttl ,
27
28
operator_retroactive_policy_ttl ,
28
- operator_retroactive_policy_publish_ttl
29
+ operator_retroactive_policy_publish_ttl ,
30
+ queue_type_specific_policies ,
31
+ queue_version_specific_policies ,
32
+ is_supported_operator_policy_expires ,
33
+ is_supported_operator_policy_message_ttl ,
34
+ is_supported_operator_policy_max_length ,
35
+ is_supported_operator_policy_max_length ,
36
+ is_supported_operator_policy_max_in_memory_length ,
37
+ is_supported_operator_policy_max_in_memory_bytes ,
38
+ is_supported_operator_policy_delivery_limit ,
39
+ is_supported_operator_policy_ha
29
40
]}
30
41
].
31
42
@@ -208,14 +219,179 @@ target_count_policy(Config) ->
208
219
rabbit_ct_client_helpers :close_connection (Conn ),
209
220
passed .
210
221
222
+ queue_type_specific_policies (Config ) ->
223
+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
224
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
225
+ ClassicQ = <<" policy_ttl-classic_queue" >>,
226
+ QuorumQ = <<" policy_ttl-quorum_queue" >>,
227
+ StreamQ = <<" policy_ttl-stream_queue" >>,
228
+
229
+ % % all policies match ".*" but different values should be applied based on queue type
230
+ rabbit_ct_broker_helpers :set_policy (Config , 0 , <<" ttl-policy-classic" >>,
231
+ <<" .*" >>, <<" classic_queues" >>, [{<<" message-ttl" >>, 20 }]),
211
232
212
- % %----------------------------------------------------------------------------
233
+ rabbit_ct_broker_helpers :set_policy (Config , 0 , <<" ttl-policy-quorum" >>,
234
+ <<" .*" >>, <<" quorum_queues" >>, [{<<" message-ttl" >>, 40 }]),
235
+
236
+ rabbit_ct_broker_helpers :set_policy (Config , 0 , <<" ttl-policy-stream" >>,
237
+ <<" .*" >>, <<" streams" >>, [{<<" max-age" >>, " 1h" }]),
238
+
239
+ declare (Ch , ClassicQ , [{<<" x-queue-type" >>, longstr , <<" classic" >>}]),
240
+ declare (Ch , QuorumQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
241
+ declare (Ch , StreamQ , [{<<" x-queue-type" >>, longstr , <<" stream" >>}]),
242
+ timer :sleep (1 ),
243
+
244
+ ? assertMatch (20 , check_policy_value (Server , ClassicQ , <<" message-ttl" >>)),
245
+ ? assertMatch (40 , check_policy_value (Server , QuorumQ , <<" message-ttl" >>)),
246
+ ? assertMatch (" 1h" , check_policy_value (Server , StreamQ , <<" max-age" >>)),
247
+
248
+ delete (Ch , ClassicQ ),
249
+ delete (Ch , QuorumQ ),
250
+ delete (Ch , StreamQ ),
251
+ rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" ttl-policy-classic" >>),
252
+ rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" ttl-policy-quorum" >>),
253
+ rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" ttl-policy-stream" >>),
254
+
255
+ rabbit_ct_client_helpers :close_channel (Ch ),
256
+ rabbit_ct_client_helpers :close_connection (Conn ),
257
+ passed .
258
+
259
+ queue_version_specific_policies (Config ) ->
260
+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
261
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
262
+ QName = <<" policy_queue_version" >>,
263
+ declare (Ch , QName ),
264
+ QueueVersionOnePolicy = [{<<" queue-version" >>, 1 }],
265
+ QueueVersionTwoPolicy = [{<<" queue-version" >>, 2 }],
266
+
267
+ Opts = #{config => Config ,
268
+ server => Server ,
269
+ qname => QName },
270
+
271
+ % % Queue version OperPolicy has precedence always
272
+ verify_policies (QueueVersionOnePolicy , QueueVersionTwoPolicy , QueueVersionTwoPolicy , Opts ),
273
+ verify_policies (QueueVersionTwoPolicy , QueueVersionOnePolicy , QueueVersionOnePolicy , Opts ),
274
+
275
+ delete (Ch , QName ),
276
+ rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" policy" >>),
277
+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" op_policy" >>),
278
+ rabbit_ct_client_helpers :close_channel (Ch ),
279
+ rabbit_ct_client_helpers :close_connection (Conn ),
280
+ passed .
281
+
282
+ % % See supported policies in https://www.rabbitmq.com/parameters.html#operator-policies
283
+ % % This test applies all supported operator policies to all queue types,
284
+ % % and later verifies the effective policy definitions.
285
+ % % Just those supported by each queue type should be present.
286
+
287
+ is_supported_operator_policy_expires (Config ) ->
288
+ Value = 6000000 ,
289
+ effective_operator_policy_per_queue_type (
290
+ Config , <<" expires" >>, Value , Value , Value , undefined ).
291
+
292
+ is_supported_operator_policy_message_ttl (Config ) ->
293
+ Value = 1000 ,
294
+ effective_operator_policy_per_queue_type (
295
+ Config , <<" message-ttl" >>, Value , Value , Value , undefined ).
296
+
297
+ is_supported_operator_policy_max_length (Config ) ->
298
+ Value = 500 ,
299
+ effective_operator_policy_per_queue_type (
300
+ Config , <<" max-length" >>, Value , Value , Value , undefined ).
301
+
302
+ is_supported_operator_policy_max_length_bytes (Config ) ->
303
+ Value = 1500 ,
304
+ effective_operator_policy_per_queue_type (
305
+ Config , <<" max-length-bytes" >>, Value , Value , Value , Value ).
306
+
307
+ is_supported_operator_policy_max_in_memory_length (Config ) ->
308
+ Value = 30 ,
309
+ effective_operator_policy_per_queue_type (
310
+ Config , <<" max-in-memory-length" >>, Value , undefined , Value , undefined ).
311
+
312
+ is_supported_operator_policy_max_in_memory_bytes (Config ) ->
313
+ Value = 50000 ,
314
+ effective_operator_policy_per_queue_type (
315
+ Config , <<" max-in-memory-bytes" >>, Value , undefined , Value , undefined ).
316
+
317
+ is_supported_operator_policy_delivery_limit (Config ) ->
318
+ Value = 3 ,
319
+ effective_operator_policy_per_queue_type (
320
+ Config , <<" delivery-limit" >>, Value , undefined , Value , undefined ).
321
+
322
+ is_supported_operator_policy_ha (Config ) ->
323
+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
324
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
325
+ ClassicQ = <<" classic_queue" >>,
326
+ QuorumQ = <<" quorum_queue" >>,
327
+ StreamQ = <<" stream_queue" >>,
328
+
329
+ declare (Ch , ClassicQ , [{<<" x-queue-type" >>, longstr , <<" classic" >>}]),
330
+ declare (Ch , QuorumQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
331
+ declare (Ch , StreamQ , [{<<" x-queue-type" >>, longstr , <<" stream" >>}]),
332
+
333
+ rabbit_ct_broker_helpers :set_operator_policy (
334
+ Config , 0 , <<" operator-policy" >>, <<" .*" >>, <<" all" >>,
335
+ [{<<" ha-mode" >>, <<" exactly" >>},
336
+ {<<" ha-params" >>, 2 },
337
+ {<<" ha-sync-mode" >>, <<" automatic" >>}]),
338
+
339
+ ? awaitMatch (<<" exactly" >>, check_policy_value (Server , ClassicQ , <<" ha-mode" >>), 30_000 ),
340
+ ? awaitMatch (2 , check_policy_value (Server , ClassicQ , <<" ha-params" >>), 30_000 ),
341
+ ? awaitMatch (<<" automatic" >>, check_policy_value (Server , ClassicQ , <<" ha-sync-mode" >>), 30_000 ),
342
+ ? awaitMatch (undefined , check_policy_value (Server , QuorumQ , <<" ha-mode" >>), 30_000 ),
343
+ ? awaitMatch (undefined , check_policy_value (Server , StreamQ , <<" ha-mode" >>), 30_000 ),
344
+
345
+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" operator-policy" >>),
346
+
347
+ delete (Ch , ClassicQ ),
348
+ delete (Ch , QuorumQ ),
349
+ delete (Ch , StreamQ ),
350
+
351
+ rabbit_ct_client_helpers :close_channel (Ch ),
352
+ rabbit_ct_client_helpers :close_connection (Conn ),
353
+ passed .
354
+
355
+ effective_operator_policy_per_queue_type (Config , Name , Value , ClassicValue , QuorumValue , StreamValue ) ->
356
+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
357
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
358
+ ClassicQ = <<" classic_queue" >>,
359
+ QuorumQ = <<" quorum_queue" >>,
360
+ StreamQ = <<" stream_queue" >>,
361
+
362
+ declare (Ch , ClassicQ , [{<<" x-queue-type" >>, longstr , <<" classic" >>}]),
363
+ declare (Ch , QuorumQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
364
+ declare (Ch , StreamQ , [{<<" x-queue-type" >>, longstr , <<" stream" >>}]),
365
+
366
+ rabbit_ct_broker_helpers :set_operator_policy (
367
+ Config , 0 , <<" operator-policy" >>, <<" .*" >>, <<" all" >>,
368
+ [{Name , Value }]),
369
+
370
+ ? awaitMatch (ClassicValue , check_policy_value (Server , ClassicQ , Name ), 30_000 ),
371
+ ? awaitMatch (QuorumValue , check_policy_value (Server , QuorumQ , Name ), 30_000 ),
372
+ ? awaitMatch (StreamValue , check_policy_value (Server , StreamQ , Name ), 30_000 ),
213
373
374
+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" operator-policy" >>),
375
+
376
+ delete (Ch , ClassicQ ),
377
+ delete (Ch , QuorumQ ),
378
+ delete (Ch , StreamQ ),
379
+
380
+ rabbit_ct_client_helpers :close_channel (Ch ),
381
+ rabbit_ct_client_helpers :close_connection (Conn ),
382
+ passed .
383
+
384
+ % %----------------------------------------------------------------------------
214
385
215
386
declare (Ch , Q ) ->
216
387
amqp_channel :call (Ch , # 'queue.declare' {queue = Q ,
217
388
durable = true }).
218
389
390
+ declare (Ch , Q , Args ) ->
391
+ amqp_channel :call (Ch , # 'queue.declare' {queue = Q ,
392
+ durable = true ,
393
+ arguments = Args }).
394
+
219
395
delete (Ch , Q ) ->
220
396
amqp_channel :call (Ch , # 'queue.delete' {queue = Q }).
221
397
@@ -269,10 +445,10 @@ verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config,
269
445
server := Server ,
270
446
qname := QName }) ->
271
447
rabbit_ct_broker_helpers :set_policy (Config , 0 , <<" policy" >>,
272
- << " policy_ha " >> , <<" queues" >>,
448
+ QName , <<" queues" >>,
273
449
Policy ),
274
450
rabbit_ct_broker_helpers :set_operator_policy (Config , 0 , <<" op_policy" >>,
275
- << " policy_ha " >> , <<" queues" >>,
451
+ QName , <<" queues" >>,
276
452
OperPolicy ),
277
453
verify_policy (VerifyFuns , Server , QName ).
278
454
0 commit comments