Skip to content

Commit f86f220

Browse files
committed
PHPLIB-411: Fix resuming change stream if startAfter option was passed to original watch operation
1 parent aac3c6b commit f86f220

File tree

3 files changed

+106
-4
lines changed

3 files changed

+106
-4
lines changed

src/ChangeStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private function onIteration($incrementKey)
220220
*/
221221
private function resume()
222222
{
223-
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
223+
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
224224
$this->iterator->rewind();
225225

226226
$this->onIteration($this->hasAdvanced);

src/Operation/Watch.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public function execute(Server $server)
250250
{
251251
return new ChangeStream(
252252
$this->createChangeStreamIterator($server),
253-
function($resumeToken) { return $this->resume($resumeToken); }
253+
function($resumeToken, $hasAdvanced) { return $this->resume($resumeToken, $hasAdvanced); }
254254
);
255255
}
256256

@@ -333,10 +333,11 @@ private function getInitialResumeToken()
333333
*
334334
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
335335
* @param array|object|null $resumeToken
336+
* @param bool $hasAdvanced
336337
* @return ChangeStreamIterator
337338
* @throws InvalidArgumentException
338339
*/
339-
private function resume($resumeToken = null)
340+
private function resume($resumeToken = null, $hasAdvanced = false)
340341
{
341342
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
342343
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
@@ -347,12 +348,14 @@ private function resume($resumeToken = null)
347348
// Select a new server using the original read preference
348349
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);
349350

351+
$resumeOption = isset($this->changeStreamOptions['startAfter']) && !$hasAdvanced ? 'startAfter' : 'resumeAfter';
352+
350353
unset($this->changeStreamOptions['resumeAfter']);
351354
unset($this->changeStreamOptions['startAfter']);
352355
unset($this->changeStreamOptions['startAtOperationTime']);
353356

354357
if ($resumeToken !== null) {
355-
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
358+
$this->changeStreamOptions[$resumeOption] = $resumeToken;
356359
}
357360

358361
if ($resumeToken === null && $this->operationTime !== null) {

tests/Operation/WatchFunctionalTest.php

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,105 @@ public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
13821382
$this->assertSame($resumeToken, $changeStream->getResumeToken());
13831383
}
13841384

1385+
/**
1386+
* Prose test: "$changeStream stage for ChangeStream started with startAfter
1387+
* against a server >=4.1.1 that has not received any results yet MUST
1388+
* include a startAfter option and MUST NOT include a resumeAfter option
1389+
* when resuming a change stream."
1390+
*/
1391+
public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption()
1392+
{
1393+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1394+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1395+
}
1396+
1397+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1398+
$changeStream = $operation->execute($this->getPrimaryServer());
1399+
1400+
$this->insertDocument(['x' => 1]);
1401+
1402+
$changeStream->next();
1403+
$this->assertTrue($changeStream->valid());
1404+
$resumeToken = $changeStream->getResumeToken();
1405+
1406+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1407+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1408+
$changeStream = $operation->execute($this->getPrimaryServer());
1409+
$changeStream->rewind();
1410+
$this->killChangeStreamCursor($changeStream);
1411+
1412+
$aggregateCommand = null;
1413+
1414+
(new CommandObserver)->observe(
1415+
function() use ($changeStream) {
1416+
$changeStream->next();
1417+
},
1418+
function(array $event) use (&$aggregateCommand) {
1419+
if ($event['started']->getCommandName() !== 'aggregate') {
1420+
return;
1421+
}
1422+
1423+
$aggregateCommand = $event['started']->getCommand();
1424+
}
1425+
);
1426+
1427+
$this->assertNotNull($aggregateCommand);
1428+
$this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1429+
$this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1430+
}
1431+
1432+
/**
1433+
* Prose test: "$changeStream stage for ChangeStream started with startAfter
1434+
* against a server >=4.1.1 that has received at least one result MUST
1435+
* include a resumeAfter option and MUST NOT include a startAfter option
1436+
* when resuming a change stream."
1437+
*/
1438+
public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption()
1439+
{
1440+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1441+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1442+
}
1443+
1444+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1445+
$changeStream = $operation->execute($this->getPrimaryServer());
1446+
1447+
$this->insertDocument(['x' => 1]);
1448+
1449+
$changeStream->next();
1450+
$this->assertTrue($changeStream->valid());
1451+
$resumeToken = $changeStream->getResumeToken();
1452+
1453+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1454+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1455+
$changeStream = $operation->execute($this->getPrimaryServer());
1456+
$changeStream->rewind();
1457+
1458+
$this->insertDocument(['x' => 2]);
1459+
$changeStream->next();
1460+
$this->assertTrue($changeStream->valid());
1461+
1462+
$this->killChangeStreamCursor($changeStream);
1463+
1464+
$aggregateCommand = null;
1465+
1466+
(new CommandObserver)->observe(
1467+
function() use ($changeStream) {
1468+
$changeStream->next();
1469+
},
1470+
function(array $event) use (&$aggregateCommand) {
1471+
if ($event['started']->getCommandName() !== 'aggregate') {
1472+
return;
1473+
}
1474+
1475+
$aggregateCommand = $event['started']->getCommand();
1476+
}
1477+
);
1478+
1479+
$this->assertNotNull($aggregateCommand);
1480+
$this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1481+
$this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1482+
}
1483+
13851484
private function assertNoCommandExecuted(callable $callable)
13861485
{
13871486
$commands = [];

0 commit comments

Comments
 (0)