Skip to content

Commit 7188432

Browse files
committed
feature symfony#49730 [Messenger] Add Clock support in Worker (fabpot)
This PR was merged into the 6.3 branch. Discussion ---------- [Messenger] Add Clock support in Worker | Q | A | ------------- | --- | Branch? | 6.3 | Bug fix? | no | New feature? | yes <!-- please update src/**/CHANGELOG.md files --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tickets | n/a | License | MIT | Doc PR | n/a Is it useful? Commits ------- 8e1a6e7 [Messenger] Add Clock support in Worker
2 parents 17c77fe + 8e1a6e7 commit 7188432

File tree

3 files changed

+39
-45
lines changed

3 files changed

+39
-45
lines changed

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Psr\EventDispatcher\EventDispatcherInterface;
1616
use Psr\Log\LoggerInterface;
17+
use Symfony\Component\Clock\MockClock;
1718
use Symfony\Component\EventDispatcher\EventDispatcher;
1819
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
1920
use Symfony\Component\Messenger\Envelope;
@@ -88,7 +89,7 @@ public function dispatch(object $event): object
8889
}
8990
};
9091

91-
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
92+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock());
9293
$worker->run();
9394

9495
$this->assertSame($apiMessage, $envelopes[0]->getMessage());
@@ -112,7 +113,7 @@ public function testHandlingErrorCausesReject()
112113
$dispatcher = new EventDispatcher();
113114
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
114115

115-
$worker = new Worker(['transport1' => $receiver], $bus, $dispatcher);
116+
$worker = new Worker(['transport1' => $receiver], $bus, $dispatcher, clock: new MockClock());
116117
$worker->run();
117118

118119
$this->assertSame(1, $receiver->getRejectCount());
@@ -127,7 +128,7 @@ public function testWorkerResetsConnectionIfReceiverIsResettable()
127128
$dispatcher->addSubscriber(new ResetServicesListener(new ServicesResetter(new \ArrayIterator([$resettableReceiver]), ['reset'])));
128129

129130
$bus = $this->createMock(MessageBusInterface::class);
130-
$worker = new Worker([$resettableReceiver], $bus, $dispatcher);
131+
$worker = new Worker([$resettableReceiver], $bus, $dispatcher, clock: new MockClock());
131132
$worker->stop();
132133
$worker->run();
133134
$this->assertTrue($resettableReceiver->hasBeenReset());
@@ -145,7 +146,7 @@ public function testWorkerResetsTransportsIfResetServicesListenerIsCalled()
145146
});
146147

147148
$bus = $this->createMock(MessageBusInterface::class);
148-
$worker = new Worker([$resettableReceiver], $bus, $dispatcher);
149+
$worker = new Worker([$resettableReceiver], $bus, $dispatcher, clock: new MockClock());
149150
$worker->run();
150151
$this->assertTrue($resettableReceiver->hasBeenReset());
151152
}
@@ -162,7 +163,7 @@ public function testWorkerDoesNotResetTransportsIfResetServicesListenerIsNotCall
162163
$event->getWorker()->stop();
163164
});
164165

165-
$worker = new Worker([$resettableReceiver], $bus, $dispatcher);
166+
$worker = new Worker([$resettableReceiver], $bus, $dispatcher, clock: new MockClock());
166167
$worker->run();
167168
$this->assertFalse($resettableReceiver->hasBeenReset());
168169
}
@@ -181,7 +182,7 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
181182
$event->getWorker()->stop();
182183
});
183184

184-
$worker = new Worker([$receiver], $bus, $dispatcher);
185+
$worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock());
185186
$worker->run();
186187
}
187188

@@ -215,7 +216,7 @@ public function testWorkerDispatchesEventsOnSuccess()
215216
return $event;
216217
});
217218

218-
$worker = new Worker([$receiver], $bus, $eventDispatcher);
219+
$worker = new Worker([$receiver], $bus, $eventDispatcher, clock: new MockClock());
219220
$worker->run();
220221
}
221222

@@ -225,7 +226,7 @@ public function testWorkerWithoutDispatcher()
225226
$receiver = new DummyReceiver([[$envelope]]);
226227

227228
$bus = $this->createMock(MessageBusInterface::class);
228-
$worker = new Worker([$receiver], $bus);
229+
$worker = new Worker([$receiver], $bus, clock: new MockClock());
229230

230231
$bus->expects($this->once())
231232
->method('dispatch')
@@ -269,7 +270,7 @@ public function testWorkerDispatchesEventsOnError()
269270
return $event;
270271
});
271272

272-
$worker = new Worker([$receiver], $bus, $eventDispatcher);
273+
$worker = new Worker([$receiver], $bus, $eventDispatcher, clock: new MockClock());
273274
$worker->run();
274275
}
275276

@@ -286,7 +287,7 @@ public function testWorkerContainsMetadata()
286287
$event->getWorker()->stop();
287288
});
288289

289-
$worker = new Worker(['dummyReceiver' => $receiver], $bus, $dispatcher);
290+
$worker = new Worker(['dummyReceiver' => $receiver], $bus, $dispatcher, clock: new MockClock());
290291
$worker->run(['queues' => ['queue1', 'queue2']]);
291292

292293
$workerMetadata = $worker->getMetadata();
@@ -313,16 +314,10 @@ public function testTimeoutIsConfigurable()
313314
$dispatcher = new EventDispatcher();
314315
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(5));
315316

316-
$worker = new Worker([$receiver], $bus, $dispatcher);
317-
$startTime = microtime(true);
318-
// sleep .1 after each idle
319-
$worker->run(['sleep' => 100000]);
320-
321-
$duration = microtime(true) - $startTime;
322-
// wait time should be .3 seconds
323-
// use .29 & .31 for timing "wiggle room"
324-
$this->assertGreaterThanOrEqual(.29, $duration);
325-
$this->assertLessThan(.31, $duration);
317+
$clock = new MockClock('2023-03-19 14:00:00');
318+
$worker = new Worker([$receiver], $bus, $dispatcher, clock: $clock);
319+
$worker->run(['sleep' => 1000000]);
320+
$this->assertEquals(new \DateTimeImmutable('2023-03-19 14:00:03'), $clock->now());
326321
}
327322

328323
public function testWorkerWithMultipleReceivers()
@@ -368,7 +363,7 @@ public function testWorkerWithMultipleReceivers()
368363
$dispatcher->addListener(WorkerMessageReceivedEvent::class, function (WorkerMessageReceivedEvent $event) use (&$processedEnvelopes) {
369364
$processedEnvelopes[] = $event->getEnvelope();
370365
});
371-
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus, $dispatcher);
366+
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus, $dispatcher, clock: new MockClock());
372367
$worker->run();
373368

374369
// make sure they were processed in the correct order
@@ -393,7 +388,7 @@ public function testWorkerLimitQueues()
393388
$dispatcher = new EventDispatcher();
394389
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
395390

396-
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
391+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock());
397392
$worker->run(['queues' => ['foo']]);
398393
}
399394

@@ -404,7 +399,7 @@ public function testWorkerLimitQueuesUnsupported()
404399

405400
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
406401

407-
$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus);
402+
$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus, clock: new MockClock());
408403
$this->expectException(RuntimeException::class);
409404
$this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class));
410405
$worker->run(['queues' => ['foo']]);
@@ -429,7 +424,7 @@ public function testWorkerMessageReceivedEventMutability()
429424

430425
$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
431426

432-
$worker = new Worker([$receiver], $bus, $eventDispatcher);
427+
$worker = new Worker([$receiver], $bus, $eventDispatcher, clock: new MockClock());
433428
$worker->run();
434429

435430
$envelope = current($receiver->getAcknowledgedEnvelopes());
@@ -464,7 +459,7 @@ public function testWorkerRateLimitMessages()
464459
'interval' => '1 minute',
465460
], new InMemoryStorage());
466461

467-
$worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory]);
462+
$worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory], new MockClock());
468463
$worker->run();
469464

470465
$this->assertCount(2, $receiver->getAcknowledgedEnvelopes());
@@ -476,7 +471,7 @@ public function testWorkerShouldLogOnStop()
476471
$bus = $this->createMock(MessageBusInterface::class);
477472
$logger = $this->createMock(LoggerInterface::class);
478473
$logger->expects($this->once())->method('info')->with('Stopping worker.');
479-
$worker = new Worker([], $bus, new EventDispatcher(), $logger);
474+
$worker = new Worker([], $bus, new EventDispatcher(), $logger, clock: new MockClock());
480475

481476
$worker->stop();
482477
}
@@ -512,7 +507,7 @@ public function testBatchProcessing()
512507
}
513508
});
514509

515-
$worker = new Worker([$receiver], $bus, $dispatcher);
510+
$worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock());
516511
$worker->run();
517512

518513
$this->assertSame($expectedMessages, $handler->processedMessages);
@@ -548,7 +543,7 @@ public function testFlushBatchOnIdle()
548543
}
549544
});
550545

551-
$worker = new Worker([$receiver], $bus, $dispatcher);
546+
$worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock());
552547
$worker->run();
553548

554549
$this->assertSame($expectedMessages, $handler->processedMessages);
@@ -578,7 +573,7 @@ public function testFlushBatchOnStop()
578573
$this->assertSame(0, $receiver->getAcknowledgeCount());
579574
});
580575

581-
$worker = new Worker([$receiver], $bus, $dispatcher);
576+
$worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock());
582577
$worker->run();
583578

584579
$this->assertSame($expectedMessages, $handler->processedMessages);

src/Symfony/Component/Messenger/Worker.php

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
use Psr\EventDispatcher\EventDispatcherInterface;
1515
use Psr\Log\LoggerInterface;
16+
use Symfony\Component\Clock\Clock;
17+
use Symfony\Component\Clock\ClockInterface;
1618
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
1719
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
1820
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
@@ -40,30 +42,26 @@
4042
*/
4143
class Worker
4244
{
43-
private array $receivers;
44-
private MessageBusInterface $bus;
45-
private ?EventDispatcherInterface $eventDispatcher;
46-
private ?LoggerInterface $logger;
4745
private bool $shouldStop = false;
4846
private WorkerMetadata $metadata;
4947
private array $acks = [];
5048
private \SplObjectStorage $unacks;
51-
private ?array $rateLimiters;
5249

5350
/**
5451
* @param ReceiverInterface[] $receivers Where the key is the transport name
5552
*/
56-
public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null)
57-
{
58-
$this->receivers = $receivers;
59-
$this->bus = $bus;
60-
$this->logger = $logger;
61-
$this->eventDispatcher = $eventDispatcher;
53+
public function __construct(
54+
private array $receivers,
55+
private MessageBusInterface $bus,
56+
private ?EventDispatcherInterface $eventDispatcher = null,
57+
private ?LoggerInterface $logger = null,
58+
private ?array $rateLimiters = null,
59+
private ClockInterface $clock = new Clock(),
60+
) {
6261
$this->metadata = new WorkerMetadata([
6362
'transportNames' => array_keys($receivers),
6463
]);
6564
$this->unacks = new \SplObjectStorage();
66-
$this->rateLimiters = $rateLimiters;
6765
}
6866

6967
/**
@@ -95,7 +93,7 @@ public function run(array $options = []): void
9593

9694
while (!$this->shouldStop) {
9795
$envelopeHandled = false;
98-
$envelopeHandledStart = microtime(true);
96+
$envelopeHandledStart = $this->clock->now();
9997
foreach ($this->receivers as $transportName => $receiver) {
10098
if ($queueNames) {
10199
$envelopes = $receiver->getFromQueues($queueNames);
@@ -130,8 +128,8 @@ public function run(array $options = []): void
130128
if (!$envelopeHandled) {
131129
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, true));
132130

133-
if (0 < $sleep = (int) ($options['sleep'] - 1e6 * (microtime(true) - $envelopeHandledStart))) {
134-
usleep($sleep);
131+
if (0 < $sleep = (int) ($options['sleep'] - 1e6 * ($this->clock->now()->format('U.u') - $envelopeHandledStart->format('U.u')))) {
132+
$this->clock->sleep($sleep / 1e6);
135133
}
136134
}
137135
}

src/Symfony/Component/Messenger/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
],
1818
"require": {
1919
"php": ">=8.1",
20-
"psr/log": "^1|^2|^3"
20+
"psr/log": "^1|^2|^3",
21+
"symfony/clock": "^6.3"
2122
},
2223
"require-dev": {
2324
"psr/cache": "^1.0|^2.0|^3.0",

0 commit comments

Comments
 (0)