Skip to content

Commit 27d8daf

Browse files
bug symfony#43096 [Messenger] Use TransportMessageIdStamp in InMemoryTransport allows retrying (alexndlm)
This PR was merged into the 4.4 branch. Discussion ---------- [Messenger] Use `TransportMessageIdStamp` in `InMemoryTransport` allows retrying | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | License | MIT Commits ------- 609b07d [Messenger] Use `TransportMessageIdStamp` in `InMemoryTransport` allows retrying
2 parents 7f8db2c + 609b07d commit 27d8daf

File tree

2 files changed

+25
-10
lines changed

2 files changed

+25
-10
lines changed

src/Symfony/Component/Messenger/Tests/Transport/InMemoryTransportTest.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ public function testSend()
4141
public function testQueue()
4242
{
4343
$envelope1 = new Envelope(new \stdClass());
44-
$this->transport->send($envelope1);
44+
$envelope1 = $this->transport->send($envelope1);
4545
$envelope2 = new Envelope(new \stdClass());
46-
$this->transport->send($envelope2);
46+
$envelope2 = $this->transport->send($envelope2);
4747
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
4848
$this->transport->ack($envelope1);
4949
$this->assertSame([$envelope2], $this->transport->get());
@@ -54,9 +54,9 @@ public function testQueue()
5454
public function testAcknowledgeSameMessageWithDifferentStamps()
5555
{
5656
$envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
57-
$this->transport->send($envelope1);
57+
$envelope1 = $this->transport->send($envelope1);
5858
$envelope2 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
59-
$this->transport->send($envelope2);
59+
$envelope2 = $this->transport->send($envelope2);
6060
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
6161
$this->transport->ack($envelope1->with(new AnEnvelopeStamp()));
6262
$this->assertSame([$envelope2], $this->transport->get());
@@ -67,21 +67,23 @@ public function testAcknowledgeSameMessageWithDifferentStamps()
6767
public function testAck()
6868
{
6969
$envelope = new Envelope(new \stdClass());
70+
$envelope = $this->transport->send($envelope);
7071
$this->transport->ack($envelope);
7172
$this->assertSame([$envelope], $this->transport->getAcknowledged());
7273
}
7374

7475
public function testReject()
7576
{
7677
$envelope = new Envelope(new \stdClass());
78+
$envelope = $this->transport->send($envelope);
7779
$this->transport->reject($envelope);
7880
$this->assertSame([$envelope], $this->transport->getRejected());
7981
}
8082

8183
public function testReset()
8284
{
8385
$envelope = new Envelope(new \stdClass());
84-
$this->transport->send($envelope);
86+
$envelope = $this->transport->send($envelope);
8587
$this->transport->ack($envelope);
8688
$this->transport->reject($envelope);
8789

src/Symfony/Component/Messenger/Transport/InMemoryTransport.php

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
namespace Symfony\Component\Messenger\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\LogicException;
16+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1517
use Symfony\Contracts\Service\ResetInterface;
1618

1719
/**
@@ -41,6 +43,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
4143
*/
4244
private $queue = [];
4345

46+
private $nextId = 1;
47+
4448
/**
4549
* {@inheritdoc}
4650
*/
@@ -55,8 +59,12 @@ public function get(): iterable
5559
public function ack(Envelope $envelope): void
5660
{
5761
$this->acknowledged[] = $envelope;
58-
$id = spl_object_hash($envelope->getMessage());
59-
unset($this->queue[$id]);
62+
63+
if (!$transportMessageIdStamp = $envelope->last(TransportMessageIdStamp::class)) {
64+
throw new LogicException('No TransportMessageIdStamp found on the Envelope.');
65+
}
66+
67+
unset($this->queue[$transportMessageIdStamp->getId()]);
6068
}
6169

6270
/**
@@ -65,8 +73,12 @@ public function ack(Envelope $envelope): void
6573
public function reject(Envelope $envelope): void
6674
{
6775
$this->rejected[] = $envelope;
68-
$id = spl_object_hash($envelope->getMessage());
69-
unset($this->queue[$id]);
76+
77+
if (!$transportMessageIdStamp = $envelope->last(TransportMessageIdStamp::class)) {
78+
throw new LogicException('No TransportMessageIdStamp found on the Envelope.');
79+
}
80+
81+
unset($this->queue[$transportMessageIdStamp->getId()]);
7082
}
7183

7284
/**
@@ -75,7 +87,8 @@ public function reject(Envelope $envelope): void
7587
public function send(Envelope $envelope): Envelope
7688
{
7789
$this->sent[] = $envelope;
78-
$id = spl_object_hash($envelope->getMessage());
90+
$id = $this->nextId++;
91+
$envelope = $envelope->with(new TransportMessageIdStamp($id));
7992
$this->queue[$id] = $envelope;
8093

8194
return $envelope;

0 commit comments

Comments
 (0)