Skip to content

Commit 2a5ff20

Browse files
committed
PHPLIB-457: Implement prose change stream tests
1 parent 9074847 commit 2a5ff20

File tree

1 file changed

+169
-1
lines changed

1 file changed

+169
-1
lines changed

tests/Operation/WatchFunctionalTest.php

Lines changed: 169 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
namespace MongoDB\Tests\Operation;
44

5+
use Closure;
56
use MongoDB\ChangeStream;
67
use MongoDB\BSON\TimestampInterface;
78
use MongoDB\Driver\Cursor;
9+
use MongoDB\Driver\Exception\CommandException;
810
use MongoDB\Driver\Manager;
911
use MongoDB\Driver\ReadPreference;
1012
use MongoDB\Driver\Server;
@@ -22,6 +24,8 @@
2224

2325
class WatchFunctionalTest extends FunctionalTestCase
2426
{
27+
const NOT_MASTER = 10107;
28+
2529
private static $wireVersionForStartAtOperationTime = 7;
2630

2731
private $defaultOptions = ['maxAwaitTimeMS' => 500];
@@ -887,9 +891,11 @@ public function testRewindExtractsResumeTokenAndNextResumes()
887891
$changeStream->next();
888892
$this->assertTrue($changeStream->valid());
889893

890-
$options = ['resumeAfter' => $changeStream->current()->_id] + $this->defaultOptions;
894+
$resumeToken = $changeStream->current()->_id;
895+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
891896
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
892897
$changeStream = $operation->execute($this->getPrimaryServer());
898+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
893899

894900
$changeStream->rewind();
895901
$this->assertTrue($changeStream->valid());
@@ -976,6 +982,7 @@ public function testStartAfterOption()
976982
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
977983
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
978984
$changeStream = $operation->execute($this->getPrimaryServer());
985+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
979986

980987
$changeStream->rewind();
981988
$this->assertTrue($changeStream->valid());
@@ -1190,6 +1197,167 @@ public function testSessionFreed()
11901197
$this->assertNull($rp->getValue($changeStream));
11911198
}
11921199

1200+
/**
1201+
* Prose test: "ChangeStream will automatically resume one time on a
1202+
* resumable error (including not master) with the initial pipeline and
1203+
* options, except for the addition/update of a resumeToken."
1204+
*/
1205+
public function testResumeRepeatsOriginalPipelineAndOptions()
1206+
{
1207+
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
1208+
$this->markTestSkipped('failCommand is not supported');
1209+
}
1210+
1211+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1212+
1213+
$aggregateCommands = [];
1214+
1215+
$this->configureFailPoint([
1216+
'configureFailPoint' => 'failCommand',
1217+
'mode' => ['times' => 1],
1218+
'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER],
1219+
]);
1220+
1221+
(new CommandObserver)->observe(
1222+
function() use ($operation) {
1223+
$changeStream = $operation->execute($this->getPrimaryServer());
1224+
1225+
// The first next will hit the fail point, causing a resume
1226+
$changeStream->next();
1227+
$changeStream->next();
1228+
},
1229+
function(array $event) use (&$aggregateCommands) {
1230+
$command = $event['started']->getCommand();
1231+
if ($event['started']->getCommandName() !== 'aggregate') {
1232+
return;
1233+
}
1234+
1235+
$aggregateCommands[] = (array) $command;
1236+
}
1237+
);
1238+
1239+
$this->assertCount(2, $aggregateCommands);
1240+
1241+
$aggregateCommands = array_map(
1242+
function (array $aggregateCommand) {
1243+
// Remove resume options from the changestream document
1244+
if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) {
1245+
$aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key(
1246+
(array) $aggregateCommand['pipeline'][0]->{'$changeStream'},
1247+
['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false]
1248+
);
1249+
}
1250+
1251+
// Remove options we don't want to compare between commands
1252+
return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]);
1253+
},
1254+
$aggregateCommands
1255+
);
1256+
1257+
// Ensure options
1258+
$this->assertEquals($aggregateCommands[0], $aggregateCommands[1]);
1259+
}
1260+
1261+
/**
1262+
* Prose test: "ChangeStream will not attempt to resume on any error
1263+
* encountered while executing an aggregate command."
1264+
*/
1265+
public function testErrorDuringAggregateCommandDoesNotCauseResume()
1266+
{
1267+
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
1268+
$this->markTestSkipped('failCommand is not supported');
1269+
}
1270+
1271+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1272+
1273+
$commandCount = [];
1274+
1275+
$this->configureFailPoint([
1276+
'configureFailPoint' => 'failCommand',
1277+
'mode' => ['times' => 1],
1278+
'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::NOT_MASTER],
1279+
]);
1280+
1281+
$this->expectException(CommandException::class);
1282+
1283+
(new CommandObserver)->observe(
1284+
function() use ($operation) {
1285+
$operation->execute($this->getPrimaryServer());
1286+
},
1287+
function(array $event) use (&$commandCount) {
1288+
$commandCount++;
1289+
}
1290+
);
1291+
1292+
$this->assertSame(1, $commandCount);
1293+
}
1294+
1295+
/**
1296+
* Prose test: "ChangeStream will perform server selection before attempting
1297+
* to resume, using initial readPreference"
1298+
*/
1299+
public function testOriginalReadPreferenceIsPreservedOnResume()
1300+
{
1301+
$readPreference = new ReadPreference(ReadPreference::RP_SECONDARY);
1302+
$options = ['readPreference' => $readPreference] + $this->defaultOptions;
1303+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1304+
1305+
$secondary = $this->manager->selectServer($readPreference);
1306+
1307+
$changeStream = $operation->execute($secondary);
1308+
$previousCursorId = $changeStream->getCursorId();
1309+
$this->killChangeStreamCursor($changeStream);
1310+
1311+
$changeStream->next();
1312+
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());
1313+
1314+
$getCursor = Closure::bind(
1315+
function () {
1316+
return $this->iterator->getInnerIterator();
1317+
},
1318+
$changeStream,
1319+
ChangeStream::class
1320+
);
1321+
/** @var Cursor $cursor */
1322+
$cursor = $getCursor();
1323+
self::assertTrue($cursor->getServer()->isSecondary());
1324+
}
1325+
1326+
/**
1327+
* Prose test
1328+
* For a ChangeStream under these conditions:
1329+
* - Running against a server <4.0.7.
1330+
* - The batch is empty or has been iterated to the last document.
1331+
* Expected result:
1332+
* - getResumeToken must return the _id of the last document returned if one exists.
1333+
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
1334+
* - If resumeAfter was not specified, the getResumeToken result must be empty.
1335+
*/
1336+
public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
1337+
{
1338+
if ($this->isPostBatchResumeTokenSupported()) {
1339+
$this->markTestSkipped('postBatchResumeToken is supported');
1340+
}
1341+
1342+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1343+
$changeStream = $operation->execute($this->getPrimaryServer());
1344+
1345+
$this->assertNull($changeStream->getResumeToken());
1346+
1347+
$this->insertDocument(['x' => 1]);
1348+
1349+
$changeStream->next();
1350+
$this->assertTrue($changeStream->valid());
1351+
$resumeToken = $changeStream->getResumeToken();
1352+
$this->assertSame($resumeToken, $changeStream->current()->_id);
1353+
1354+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
1355+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1356+
$changeStream = $operation->execute($this->getPrimaryServer());
1357+
1358+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
1359+
}
1360+
11931361
private function assertNoCommandExecuted(callable $callable)
11941362
{
11951363
$commands = [];

0 commit comments

Comments
 (0)