Skip to content

Commit 20de20c

Browse files
committed
PHPLIB-457: Fix resuming change stream if startAfter option was passed to original watch operation
1 parent 2a5ff20 commit 20de20c

File tree

3 files changed

+106
-4
lines changed

3 files changed

+106
-4
lines changed

src/ChangeStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private function onIteration($incrementKey)
220220
*/
221221
private function resume()
222222
{
223-
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
223+
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
224224
$this->iterator->rewind();
225225

226226
$this->onIteration($this->hasAdvanced);

src/Operation/Watch.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public function execute(Server $server)
250250
{
251251
return new ChangeStream(
252252
$this->createChangeStreamIterator($server),
253-
function($resumeToken) { return $this->resume($resumeToken); }
253+
function($resumeToken, $hasAdvanced) { return $this->resume($resumeToken, $hasAdvanced); }
254254
);
255255
}
256256

@@ -333,10 +333,11 @@ private function getInitialResumeToken()
333333
*
334334
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
335335
* @param array|object|null $resumeToken
336+
* @param bool $hasAdvanced
336337
* @return ChangeStreamIterator
337338
* @throws InvalidArgumentException
338339
*/
339-
private function resume($resumeToken = null)
340+
private function resume($resumeToken = null, $hasAdvanced = false)
340341
{
341342
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
342343
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
@@ -347,12 +348,14 @@ private function resume($resumeToken = null)
347348
// Select a new server using the original read preference
348349
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);
349350

351+
$resumeOption = isset($this->changeStreamOptions['startAfter']) && !$hasAdvanced ? 'startAfter' : 'resumeAfter';
352+
350353
unset($this->changeStreamOptions['resumeAfter']);
351354
unset($this->changeStreamOptions['startAfter']);
352355
unset($this->changeStreamOptions['startAtOperationTime']);
353356

354357
if ($resumeToken !== null) {
355-
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
358+
$this->changeStreamOptions[$resumeOption] = $resumeToken;
356359
}
357360

358361
if ($resumeToken === null && $this->operationTime !== null) {

tests/Operation/WatchFunctionalTest.php

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,6 +1358,105 @@ public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
13581358
$this->assertSame($resumeToken, $changeStream->getResumeToken());
13591359
}
13601360

1361+
/**
1362+
* Prose test: "$changeStream stage for ChangeStream started with startAfter
1363+
* against a server >=4.1.1 that has not received any results yet MUST
1364+
* include a startAfter option and MUST NOT include a resumeAfter option
1365+
* when resuming a change stream."
1366+
*/
1367+
public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption()
1368+
{
1369+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1370+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1371+
}
1372+
1373+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1374+
$changeStream = $operation->execute($this->getPrimaryServer());
1375+
1376+
$this->insertDocument(['x' => 1]);
1377+
1378+
$changeStream->next();
1379+
$this->assertTrue($changeStream->valid());
1380+
$resumeToken = $changeStream->getResumeToken();
1381+
1382+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1383+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1384+
$changeStream = $operation->execute($this->getPrimaryServer());
1385+
$changeStream->rewind();
1386+
$this->killChangeStreamCursor($changeStream);
1387+
1388+
$aggregateCommand = null;
1389+
1390+
(new CommandObserver)->observe(
1391+
function() use ($changeStream) {
1392+
$changeStream->next();
1393+
},
1394+
function(array $event) use (&$aggregateCommand) {
1395+
if ($event['started']->getCommandName() !== 'aggregate') {
1396+
return;
1397+
}
1398+
1399+
$aggregateCommand = $event['started']->getCommand();
1400+
}
1401+
);
1402+
1403+
$this->assertNotNull($aggregateCommand);
1404+
$this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1405+
$this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1406+
}
1407+
1408+
/**
1409+
* Prose test: "$changeStream stage for ChangeStream started with startAfter
1410+
* against a server >=4.1.1 that has received at least one result MUST
1411+
* include a resumeAfter option and MUST NOT include a startAfter option
1412+
* when resuming a change stream."
1413+
*/
1414+
public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption()
1415+
{
1416+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1417+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1418+
}
1419+
1420+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1421+
$changeStream = $operation->execute($this->getPrimaryServer());
1422+
1423+
$this->insertDocument(['x' => 1]);
1424+
1425+
$changeStream->next();
1426+
$this->assertTrue($changeStream->valid());
1427+
$resumeToken = $changeStream->getResumeToken();
1428+
1429+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1430+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1431+
$changeStream = $operation->execute($this->getPrimaryServer());
1432+
$changeStream->rewind();
1433+
1434+
$this->insertDocument(['x' => 2]);
1435+
$changeStream->next();
1436+
$this->assertTrue($changeStream->valid());
1437+
1438+
$this->killChangeStreamCursor($changeStream);
1439+
1440+
$aggregateCommand = null;
1441+
1442+
(new CommandObserver)->observe(
1443+
function() use ($changeStream) {
1444+
$changeStream->next();
1445+
},
1446+
function(array $event) use (&$aggregateCommand) {
1447+
if ($event['started']->getCommandName() !== 'aggregate') {
1448+
return;
1449+
}
1450+
1451+
$aggregateCommand = $event['started']->getCommand();
1452+
}
1453+
);
1454+
1455+
$this->assertNotNull($aggregateCommand);
1456+
$this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1457+
$this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1458+
}
1459+
13611460
private function assertNoCommandExecuted(callable $callable)
13621461
{
13631462
$commands = [];

0 commit comments

Comments
 (0)