@@ -29,6 +29,7 @@ groups() ->
29
29
roundtrip_classic_queue_with_drain ,
30
30
roundtrip_quorum_queue_with_drain ,
31
31
roundtrip_stream_queue_with_drain ,
32
+ amqp_stream_amqpl ,
32
33
message_headers_conversion
33
34
]},
34
35
{metrics , [], [
@@ -303,6 +304,58 @@ roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) -
303
304
ok = amqp10_client :close_connection (Connection ),
304
305
ok .
305
306
307
+ % % Send a message with a body containing a single AMQP 1.0 value section
308
+ % % to a stream and consume via AMQP 0.9.1.
309
+ amqp_stream_amqpl (Config ) ->
310
+ Host = ? config (rmq_hostname , Config ),
311
+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
312
+ Ch = rabbit_ct_client_helpers :open_channel (Config , 0 ),
313
+ ContainerId = QName = atom_to_binary (? FUNCTION_NAME ),
314
+
315
+ amqp_channel :call (Ch , # 'queue.declare' {
316
+ queue = QName ,
317
+ durable = true ,
318
+ arguments = [{<<" x-queue-type" >>, longstr , <<" stream" >>}]}),
319
+
320
+ Address = <<" /amq/queue/" , QName /binary >>,
321
+ OpnConf = #{address => Host ,
322
+ port => Port ,
323
+ container_id => ContainerId ,
324
+ sasl => {plain , <<" guest" >>, <<" guest" >>}},
325
+ {ok , Connection } = amqp10_client :open_connection (OpnConf ),
326
+ {ok , Session } = amqp10_client :begin_session (Connection ),
327
+ SenderLinkName = <<" test-sender" >>,
328
+ {ok , Sender } = amqp10_client :attach_sender_link (Session ,
329
+ SenderLinkName ,
330
+ Address ),
331
+ wait_for_credit (Sender ),
332
+ OutMsg = amqp10_msg :new (<<" my-tag" >>, {'v1_0.amqp_value' , {binary , <<0 , 255 >>}}, true ),
333
+ ok = amqp10_client :send_msg (Sender , OutMsg ),
334
+ flush (" final" ),
335
+ ok = amqp10_client :detach_link (Sender ),
336
+ ok = amqp10_client :close_connection (Connection ),
337
+
338
+ # 'basic.qos_ok' {} = amqp_channel :call (Ch , # 'basic.qos' {global = false ,
339
+ prefetch_count = 1 }),
340
+ CTag = <<" my-tag" >>,
341
+ # 'basic.consume_ok' {} = amqp_channel :subscribe (
342
+ Ch ,
343
+ # 'basic.consume' {
344
+ queue = QName ,
345
+ consumer_tag = CTag ,
346
+ arguments = [{<<" x-stream-offset" >>, longstr , <<" first" >>}]},
347
+ self ()),
348
+ receive
349
+ {# 'basic.deliver' {consumer_tag = CTag ,
350
+ redelivered = false },
351
+ # amqp_msg {props = # 'P_basic' {type = <<" amqp-1.0" >>}}} ->
352
+ ok
353
+ after 5000 ->
354
+ exit (basic_deliver_timeout )
355
+ end ,
356
+ # 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = QName }),
357
+ ok = rabbit_ct_client_helpers :close_channel (Ch ).
358
+
306
359
message_headers_conversion (Config ) ->
307
360
Host = ? config (rmq_hostname , Config ),
308
361
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
@@ -468,7 +521,7 @@ wait_for_accepts(N) ->
468
521
469
522
delete_queue (Config , QName ) ->
470
523
Ch = rabbit_ct_client_helpers :open_channel (Config , 0 ),
471
- _ = amqp_channel :call (Ch , # 'queue.delete' {queue = QName }),
524
+ # 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = QName }),
472
525
rabbit_ct_client_helpers :close_channel (Ch ).
473
526
474
527
0 commit comments