|
11 | 11 | use MongoDB\Driver\Exception\LogicException;
|
12 | 12 | use MongoDB\Driver\Exception\ServerException;
|
13 | 13 | use MongoDB\Driver\Manager;
|
| 14 | +use MongoDB\Driver\Monitoring\CommandSucceededEvent; |
14 | 15 | use MongoDB\Driver\ReadPreference;
|
15 | 16 | use MongoDB\Driver\WriteConcern;
|
16 | 17 | use MongoDB\Exception\ResumeTokenException;
|
@@ -1403,6 +1404,67 @@ public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
|
1403 | 1404 | $this->assertSame($resumeToken, $changeStream->getResumeToken());
|
1404 | 1405 | }
|
1405 | 1406 |
|
| 1407 | + /** |
| 1408 | + * For a ChangeStream under these conditions: |
| 1409 | + * - The batch is not empty. |
| 1410 | + * - The batch hasn’t been iterated at all. |
| 1411 | + * - Only the initial aggregate command has been executed. |
| 1412 | + * Expected result: |
| 1413 | + * - getResumeToken must return startAfter from the initial aggregate if the option was specified. |
| 1414 | + * - getResumeToken must return resumeAfter from the initial aggregate if the option was specified. |
| 1415 | + * - If neither the startAfter nor resumeAfter options were specified, the getResumeToken result must be empty. |
| 1416 | + */ |
| 1417 | + public function testResumeTokenBehaviour() |
| 1418 | + { |
| 1419 | + if (version_compare($this->getServerVersion(), '4.1.1', '<')) { |
| 1420 | + $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1'); |
| 1421 | + } |
| 1422 | + |
| 1423 | + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); |
| 1424 | + |
| 1425 | + $lastOpTime = null; |
| 1426 | + |
| 1427 | + $changeStream = null; |
| 1428 | + (new CommandObserver())->observe(function () use ($operation, &$changeStream) { |
| 1429 | + $changeStream = $operation->execute($this->getPrimaryServer()); |
| 1430 | + }, function ($event) use (&$lastOpTime) { |
| 1431 | + $this->assertInstanceOf(CommandSucceededEvent::class, $event['succeeded']); |
| 1432 | + $reply = $event['succeeded']->getReply(); |
| 1433 | + |
| 1434 | + $this->assertObjectHasAttribute('operationTime', $reply); |
| 1435 | + $lastOpTime = $reply->operationTime; |
| 1436 | + }); |
| 1437 | + |
| 1438 | + $this->insertDocument(['x' => 1]); |
| 1439 | + |
| 1440 | + $changeStream->next(); |
| 1441 | + $this->assertTrue($changeStream->valid()); |
| 1442 | + $resumeToken = $changeStream->getResumeToken(); |
| 1443 | + |
| 1444 | + $this->insertDocument(['x' => 2]); |
| 1445 | + |
| 1446 | + // Test startAfter option |
| 1447 | + $options = ['startAfter' => $resumeToken] + $this->defaultOptions; |
| 1448 | + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); |
| 1449 | + $changeStream = $operation->execute($this->getPrimaryServer()); |
| 1450 | + |
| 1451 | + $this->assertEquals($resumeToken, $changeStream->getResumeToken()); |
| 1452 | + |
| 1453 | + // Test resumeAfter option |
| 1454 | + $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions; |
| 1455 | + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); |
| 1456 | + $changeStream = $operation->execute($this->getPrimaryServer()); |
| 1457 | + |
| 1458 | + $this->assertEquals($resumeToken, $changeStream->getResumeToken()); |
| 1459 | + |
| 1460 | + // Test without option |
| 1461 | + $options = ['startAtOperationTime' => $lastOpTime] + $this->defaultOptions; |
| 1462 | + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); |
| 1463 | + $changeStream = $operation->execute($this->getPrimaryServer()); |
| 1464 | + |
| 1465 | + $this->assertNull($changeStream->getResumeToken()); |
| 1466 | + } |
| 1467 | + |
1406 | 1468 | /**
|
1407 | 1469 | * Prose test: "$changeStream stage for ChangeStream started with startAfter
|
1408 | 1470 | * against a server >=4.1.1 that has not received any results yet MUST
|
|
0 commit comments