|
12 | 12 | namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
13 | 13 |
|
14 | 14 | use PHPUnit\Framework\TestCase;
|
15 |
| -use Symfony\Component\Messenger\Envelope; |
16 | 15 | use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
17 | 16 | use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
18 |
| -use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver; |
19 |
| -use Symfony\Component\Messenger\Transport\RedisExt\RedisSender; |
20 |
| -use Symfony\Component\Messenger\Transport\Serialization\Serializer; |
21 |
| -use Symfony\Component\Process\PhpProcess; |
22 |
| -use Symfony\Component\Process\Process; |
23 |
| -use Symfony\Component\Serializer as SerializerComponent; |
24 |
| -use Symfony\Component\Serializer\Encoder\JsonEncoder; |
25 |
| -use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; |
26 | 17 |
|
27 | 18 | /**
|
28 | 19 | * @requires extension redis
|
29 | 20 | */
|
30 | 21 | class RedisExtIntegrationTest extends TestCase
|
31 | 22 | {
|
| 23 | + private $redis; |
| 24 | + private $connection; |
| 25 | + |
32 | 26 | protected function setUp()
|
33 | 27 | {
|
34 |
| - parent::setUp(); |
35 |
| - |
36 | 28 | if (!getenv('MESSENGER_REDIS_DSN')) {
|
37 | 29 | $this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
|
38 | 30 | }
|
39 |
| - } |
40 |
| - |
41 |
| - public function testItSendsAndReceivesMessages() |
42 |
| - { |
43 |
| - $serializer = new Serializer( |
44 |
| - new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) |
45 |
| - ); |
46 |
| - |
47 |
| - $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN')); |
48 |
| - |
49 |
| - $sender = new RedisSender($connection, $serializer); |
50 |
| - $receiver = new RedisReceiver($connection, $serializer); |
51 | 31 |
|
52 |
| - $sender->send($first = Envelope::wrap(new DummyMessage('First'))); |
53 |
| - $sender->send($second = Envelope::wrap(new DummyMessage('Second'))); |
54 |
| - |
55 |
| - $receivedMessages = 0; |
56 |
| - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { |
57 |
| - $this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope); |
58 |
| - |
59 |
| - if (2 === ++$receivedMessages) { |
60 |
| - $receiver->stop(); |
61 |
| - } |
62 |
| - }); |
| 32 | + $this->redis = new \Redis(); |
| 33 | + $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis); |
| 34 | + $this->clearRedis(); |
| 35 | + $this->connection->setup(); |
63 | 36 | }
|
64 | 37 |
|
65 |
| - public function testItReceivesSignals() |
| 38 | + public function testConnectionSendAndGet() |
66 | 39 | {
|
67 |
| - $serializer = new Serializer( |
68 |
| - new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) |
69 |
| - ); |
70 |
| - |
71 |
| - $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN')); |
72 |
| - |
73 |
| - $sender = new RedisSender($connection, $serializer); |
74 |
| - $sender->send(Envelope::wrap(new DummyMessage('Hello'))); |
75 |
| - |
76 |
| - $amqpReadTimeout = 30; |
77 |
| - $dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout; |
78 |
| - $process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array( |
79 |
| - 'COMPONENT_ROOT' => __DIR__.'/../../../', |
80 |
| - 'DSN' => $dsn, |
81 |
| - )); |
82 |
| - |
83 |
| - $process->start(); |
84 |
| - |
85 |
| - $this->waitForOutput($process, $expectedOutput = "Receiving messages...\n"); |
86 |
| - |
87 |
| - $signalTime = microtime(true); |
88 |
| - $timedOutTime = time() + 10; |
89 |
| - |
90 |
| - $process->signal(15); |
91 |
| - |
92 |
| - while ($process->isRunning() && time() < $timedOutTime) { |
93 |
| - usleep(100 * 1000); // 100ms |
94 |
| - } |
95 |
| - |
96 |
| - $this->assertFalse($process->isRunning()); |
97 |
| - $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); |
98 |
| - $this->assertSame($expectedOutput.<<<'TXT' |
99 |
| -Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage |
100 |
| -with items: [ |
101 |
| - "Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage" |
102 |
| -] |
103 |
| -Done. |
104 |
| - |
105 |
| -TXT |
106 |
| - , $process->getOutput()); |
| 40 | + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]); |
| 41 | + $encoded = $this->connection->get(); |
| 42 | + $this->assertEquals('{"message": "Hi"}', $encoded['body']); |
| 43 | + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); |
107 | 44 | }
|
108 | 45 |
|
109 |
| - /** |
110 |
| - * @runInSeparateProcess |
111 |
| - */ |
112 |
| - public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() |
| 46 | + public function testGetTheFirstAvailableMessage() |
113 | 47 | {
|
114 |
| - $serializer = new Serializer( |
115 |
| - new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) |
116 |
| - ); |
117 |
| - |
118 |
| - $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1')); |
119 |
| - |
120 |
| - $receiver = new RedisReceiver($connection, $serializer); |
121 |
| - |
122 |
| - $receivedMessages = 0; |
123 |
| - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { |
124 |
| - $this->assertNull($envelope); |
125 |
| - |
126 |
| - if (2 === ++$receivedMessages) { |
127 |
| - $receiver->stop(); |
128 |
| - } |
129 |
| - }); |
| 48 | + $this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]); |
| 49 | + $this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]); |
| 50 | + $encoded = $this->connection->get(); |
| 51 | + $this->assertEquals('{"message": "Hi1"}', $encoded['body']); |
| 52 | + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); |
| 53 | + $encoded = $this->connection->get(); |
| 54 | + $this->assertEquals('{"message": "Hi2"}', $encoded['body']); |
| 55 | + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); |
130 | 56 | }
|
131 | 57 |
|
132 |
| - private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) |
| 58 | + private function clearRedis() |
133 | 59 | {
|
134 |
| - $timedOutTime = time() + $timeoutInSeconds; |
135 |
| - |
136 |
| - while (time() < $timedOutTime) { |
137 |
| - if (0 === strpos($process->getOutput(), $output)) { |
138 |
| - return; |
139 |
| - } |
140 |
| - |
141 |
| - usleep(100 * 1000); // 100ms |
142 |
| - } |
143 |
| - |
144 |
| - throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.'); |
| 60 | + $parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN')); |
| 61 | + $pathParts = explode('/', $parsedUrl['path'] ?? ''); |
| 62 | + $stream = $pathParts[1] ?? 'symfony'; |
| 63 | + $this->redis->del($stream); |
145 | 64 | }
|
146 | 65 | }
|
0 commit comments