Skip to content

Commit c856f2b

Browse files
committed
[Messenger] set amqp content_type based on serialization format
1 parent c77758a commit c856f2b

File tree

4 files changed

+75
-3
lines changed

4 files changed

+75
-3
lines changed

Tests/Transport/AmqpExt/AmqpSenderTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,39 @@ public function testItSendsTheEncodedMessageWithoutHeaders()
6969
$sender->send($envelope);
7070
}
7171

72+
public function testContentTypeHeaderIsMovedToAttribute()
73+
{
74+
$envelope = new Envelope(new DummyMessage('Oy'));
75+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']];
76+
77+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
78+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
79+
80+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
81+
unset($encoded['headers']['Content-Type']);
82+
$stamp = new AmqpStamp(null, AMQP_NOPARAM, ['content_type' => 'application/json']);
83+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
84+
85+
$sender = new AmqpSender($connection, $serializer);
86+
$sender->send($envelope);
87+
}
88+
89+
public function testContentTypeHeaderDoesNotOverwriteAttribute()
90+
{
91+
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk', AMQP_NOPARAM, ['content_type' => 'custom']));
92+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']];
93+
94+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
95+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
96+
97+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
98+
unset($encoded['headers']['Content-Type']);
99+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
100+
101+
$sender = new AmqpSender($connection, $serializer);
102+
$sender->send($envelope);
103+
}
104+
72105
/**
73106
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
74107
*/

Tests/Transport/Serialization/SerializerTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public function testEncodedIsHavingTheBodyAndTypeHeader()
5555
$this->assertArrayHasKey('body', $encoded);
5656
$this->assertArrayHasKey('headers', $encoded);
5757
$this->assertArrayHasKey('type', $encoded['headers']);
58-
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
58+
$this->assertSame(DummyMessage::class, $encoded['headers']['type']);
59+
$this->assertSame('application/json', $encoded['headers']['Content-Type']);
5960
}
6061

6162
public function testUsesTheCustomFormatAndContext()

Transport/AmqpExt/AmqpSender.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,26 @@ public function send(Envelope $envelope): Envelope
5050
$delay = $delayStamp->getDelay();
5151
}
5252

53+
$amqpStamp = $envelope->last(AmqpStamp::class);
54+
if (isset($encodedMessage['headers']['Content-Type'])) {
55+
$contentType = $encodedMessage['headers']['Content-Type'];
56+
unset($encodedMessage['headers']['Content-Type']);
57+
58+
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
59+
60+
if (!isset($attributes['content_type'])) {
61+
$attributes['content_type'] = $contentType;
62+
63+
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
64+
}
65+
}
66+
5367
try {
5468
$this->connection->publish(
5569
$encodedMessage['body'],
5670
$encodedMessage['headers'] ?? [],
5771
$delay,
58-
$envelope->last(AmqpStamp::class)
72+
$amqpStamp
5973
);
6074
} catch (\AMQPException $e) {
6175
throw new TransportException($e->getMessage(), 0, $e);

Transport/Serialization/Serializer.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public function encode(Envelope $envelope): array
101101

102102
$envelope = $envelope->withoutStampsOfType(NonSendableStampInterface::class);
103103

104-
$headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope);
104+
$headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope) + $this->getContentTypeHeader();
105105

106106
return [
107107
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
@@ -157,4 +157,28 @@ private function findFirstSerializerStamp(array $stamps): ?SerializerStamp
157157

158158
return null;
159159
}
160+
161+
private function getContentTypeHeader(): array
162+
{
163+
$mimeType = $this->getMimeTypeForFormat();
164+
165+
return null === $mimeType ? [] : ['Content-Type' => $mimeType];
166+
}
167+
168+
private function getMimeTypeForFormat(): ?string
169+
{
170+
switch ($this->format) {
171+
case 'json':
172+
return 'application/json';
173+
case 'xml':
174+
return 'application/xml';
175+
case 'yml':
176+
case 'yaml':
177+
return 'application/x-yaml';
178+
case 'csv':
179+
return 'text/csv';
180+
}
181+
182+
return null;
183+
}
160184
}

0 commit comments

Comments
 (0)