15
15
-include_lib (" common_test/include/ct.hrl" ).
16
16
-include_lib (" eunit/include/eunit.hrl" ).
17
17
-include_lib (" amqp_client/include/amqp_client.hrl" ).
18
+ -include_lib (" amqp10_common/include/amqp10_framing.hrl" ).
18
19
-include_lib (" rabbitmq_stomp/include/rabbit_stomp_frame.hrl" ).
19
20
20
21
-import (util ,
@@ -170,8 +171,8 @@ amqp(Config) ->
170
171
Correlation = <<" some correlation ID" >>,
171
172
ContentType = <<" text/plain" >>,
172
173
RequestPayload = <<" my request" >>,
173
- UserProperty = [{<<" rabbit 🐇" /utf8 >>, <<" carrot 🥕" /utf8 >>},
174
- {<<" x-rabbit 🐇" /utf8 >>, <<" carrot 🥕" /utf8 >>},
174
+ UserProperty = [{<<" 🐇" /utf8 >>, <<" 🥕" /utf8 >>},
175
+ {<<" x-🐇" /utf8 >>, <<" 🥕" /utf8 >>},
175
176
{<<" key" >>, <<" val" >>},
176
177
{<<" key" >>, <<" val" >>},
177
178
{<<" x-key" >>, <<" val" >>},
@@ -180,21 +181,38 @@ amqp(Config) ->
180
181
#{'Content-Type' => ContentType ,
181
182
'Correlation-Data' => Correlation ,
182
183
'Response-Topic' => MqttResponseTopic ,
183
- 'User-Property' => UserProperty },
184
+ 'User-Property' => UserProperty ,
185
+ 'Payload-Format-Indicator' => 1 },
184
186
RequestPayload , [{qos , 1 }]),
185
187
186
- % % As of 3.13, AMQP 1.0 is proxied via AMQP 0.9.1 and therefore the conversion from
187
- % % mc_mqtt to mc_amqpl takes place. We therefore lose MQTT User Property and Response Topic
188
- % % which gets converted to AMQP 0.9.1 headers. In the future, Native AMQP 1.0 will convert
189
- % % from mc_mqtt to mc_amqp allowing us to do many more assertions here.
190
188
{ok , Msg1 } = amqp10_client :get_msg (Receiver ),
191
189
ct :pal (" Received AMQP 1.0 message:~n~p " , [Msg1 ]),
192
- ? assertEqual ([RequestPayload ], amqp10_msg :body (Msg1 )),
193
- ? assertMatch (#{correlation_id := Correlation ,
194
- content_type := ContentType }, amqp10_msg :properties (Msg1 )),
190
+
195
191
? assert (amqp10_msg :header (durable , Msg1 )),
196
192
? assert (amqp10_msg :header (first_acquirer , Msg1 )),
197
193
194
+ % % We expect to receive x-headers in message annotations.
195
+ % % However, since annotation keys are symbols and symbols are only valid ASCII,
196
+ % % we expect header
197
+ % % {<<"x-🐇"/utf8>>, <<"🥕"/utf8>>}
198
+ % % to be dropped.
199
+ ? assertEqual (#{<<" x-key" >> => <<" val" >>,
200
+ <<" x-exchange" >> => <<" amq.topic" >>,
201
+ <<" x-routing-key" >> => <<" topic.1" >>},
202
+ amqp10_msg :message_annotations (Msg1 )),
203
+ % % In contrast, application property keys are of type string, and therefore UTF-8 encoded.
204
+ ? assertEqual (#{<<" 🐇" /utf8 >> => <<" 🥕" /utf8 >>,
205
+ <<" key" >> => <<" val" >>},
206
+ amqp10_msg :application_properties (Msg1 )),
207
+
208
+ #{correlation_id := Correlation ,
209
+ content_type := ContentType ,
210
+ reply_to := ReplyToAddress } = amqp10_msg :properties (Msg1 ),
211
+ ? assertEqual (<<" /topic/response.topic" >>, ReplyToAddress ),
212
+
213
+ % % Thanks to the 'Payload-Format-Indicator', we get a single utf8 value.
214
+ ? assertEqual (# 'v1_0.amqp_value' {content = {utf8 , RequestPayload }}, amqp10_msg :body (Msg1 )),
215
+
198
216
ok = amqp10_client :settle_msg (Receiver , Msg1 , accepted ),
199
217
ok = amqp10_client :detach_link (Receiver ),
200
218
ok = amqp10_client :end_session (Session1 ),
@@ -205,15 +223,14 @@ amqp(Config) ->
205
223
{ok , Session2 } = amqp10_client :begin_session (Connection2 ),
206
224
SenderLinkName = <<" test-sender" >>,
207
225
{ok , Sender } = amqp10_client :attach_sender_link (
208
- % % With Native AMQP 1.0, address should be read from received reply-to
209
- Session2 , SenderLinkName , <<" /topic/response.topic" >>, unsettled ),
226
+ Session2 , SenderLinkName , ReplyToAddress , unsettled ),
210
227
receive {amqp10_event , {link , Sender , credited }} -> ok
211
228
after 1000 -> ct :fail (credited_timeout )
212
229
end ,
213
230
214
231
DTag = <<" my-dtag" >>,
215
232
ReplyPayload = <<" my response" >>,
216
- Msg2a = amqp10_msg :new (DTag , ReplyPayload ),
233
+ Msg2a = amqp10_msg :new (DTag , # 'v1_0.amqp_value' { content = { utf8 , ReplyPayload }} ),
217
234
Msg2b = amqp10_msg :set_properties (
218
235
#{correlation_id => Correlation ,
219
236
content_type => ContentType },
@@ -237,7 +254,9 @@ amqp(Config) ->
237
254
payload := ReplyPayload ,
238
255
properties := #{'Content-Type' := ContentType ,
239
256
'Correlation-Data' := Correlation ,
240
- 'Subscription-Identifier' := 999 }},
257
+ 'Subscription-Identifier' := 999 ,
258
+ % % since the AMQP 1.0 client sent UTF-8
259
+ 'Payload-Format-Indicator' := 1 }},
241
260
MqttMsg )
242
261
after 1000 -> ct :fail (" did not receive reply" )
243
262
end ,
0 commit comments