Skip to content

Commit 7f24127

Browse files
committed
Merge pull request #675
2 parents 6a86013 + 126e748 commit 7f24127

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

tests/Operation/WatchFunctionalTest.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use MongoDB\Driver\Exception\LogicException;
1212
use MongoDB\Driver\Exception\ServerException;
1313
use MongoDB\Driver\Manager;
14+
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
1415
use MongoDB\Driver\ReadPreference;
1516
use MongoDB\Driver\WriteConcern;
1617
use MongoDB\Exception\ResumeTokenException;
@@ -1404,6 +1405,67 @@ public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
14041405
$this->assertSame($resumeToken, $changeStream->getResumeToken());
14051406
}
14061407

1408+
/**
1409+
* For a ChangeStream under these conditions:
1410+
* - The batch is not empty.
1411+
* - The batch hasn’t been iterated at all.
1412+
* - Only the initial aggregate command has been executed.
1413+
* Expected result:
1414+
* - getResumeToken must return startAfter from the initial aggregate if the option was specified.
1415+
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
1416+
* - If neither the startAfter nor resumeAfter options were specified, the getResumeToken result must be empty.
1417+
*/
1418+
public function testResumeTokenBehaviour()
1419+
{
1420+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1421+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1422+
}
1423+
1424+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1425+
1426+
$lastOpTime = null;
1427+
1428+
$changeStream = null;
1429+
(new CommandObserver())->observe(function () use ($operation, &$changeStream) {
1430+
$changeStream = $operation->execute($this->getPrimaryServer());
1431+
}, function ($event) use (&$lastOpTime) {
1432+
$this->assertInstanceOf(CommandSucceededEvent::class, $event['succeeded']);
1433+
$reply = $event['succeeded']->getReply();
1434+
1435+
$this->assertObjectHasAttribute('operationTime', $reply);
1436+
$lastOpTime = $reply->operationTime;
1437+
});
1438+
1439+
$this->insertDocument(['x' => 1]);
1440+
1441+
$changeStream->next();
1442+
$this->assertTrue($changeStream->valid());
1443+
$resumeToken = $changeStream->getResumeToken();
1444+
1445+
$this->insertDocument(['x' => 2]);
1446+
1447+
// Test startAfter option
1448+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1449+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1450+
$changeStream = $operation->execute($this->getPrimaryServer());
1451+
1452+
$this->assertEquals($resumeToken, $changeStream->getResumeToken());
1453+
1454+
// Test resumeAfter option
1455+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
1456+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1457+
$changeStream = $operation->execute($this->getPrimaryServer());
1458+
1459+
$this->assertEquals($resumeToken, $changeStream->getResumeToken());
1460+
1461+
// Test without option
1462+
$options = ['startAtOperationTime' => $lastOpTime] + $this->defaultOptions;
1463+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1464+
$changeStream = $operation->execute($this->getPrimaryServer());
1465+
1466+
$this->assertNull($changeStream->getResumeToken());
1467+
}
1468+
14071469
/**
14081470
* Prose test: "$changeStream stage for ChangeStream started with startAfter
14091471
* against a server >=4.1.1 that has not received any results yet MUST

0 commit comments

Comments
 (0)