Skip to content

Commit aac3c6b

Browse files
committed
PHPLIB-457: Implement prose change stream tests
1 parent e421365 commit aac3c6b

File tree

1 file changed

+190
-1
lines changed

1 file changed

+190
-1
lines changed

tests/Operation/WatchFunctionalTest.php

Lines changed: 190 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
namespace MongoDB\Tests\Operation;
44

5+
use Closure;
56
use MongoDB\ChangeStream;
67
use MongoDB\BSON\TimestampInterface;
78
use MongoDB\Driver\Cursor;
9+
use MongoDB\Driver\Exception\CommandException;
10+
use MongoDB\Driver\Exception\ConnectionTimeoutException;
811
use MongoDB\Driver\Manager;
912
use MongoDB\Driver\ReadPreference;
1013
use MongoDB\Driver\Server;
@@ -25,6 +28,8 @@ class WatchFunctionalTest extends FunctionalTestCase
2528
{
2629
use SetUpTearDownTrait;
2730

31+
const NOT_MASTER = 10107;
32+
2833
private static $wireVersionForStartAtOperationTime = 7;
2934

3035
private $defaultOptions = ['maxAwaitTimeMS' => 500];
@@ -890,9 +895,11 @@ public function testRewindExtractsResumeTokenAndNextResumes()
890895
$changeStream->next();
891896
$this->assertTrue($changeStream->valid());
892897

893-
$options = ['resumeAfter' => $changeStream->current()->_id] + $this->defaultOptions;
898+
$resumeToken = $changeStream->current()->_id;
899+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
894900
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
895901
$changeStream = $operation->execute($this->getPrimaryServer());
902+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
896903

897904
$changeStream->rewind();
898905
$this->assertTrue($changeStream->valid());
@@ -979,6 +986,7 @@ public function testStartAfterOption()
979986
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
980987
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
981988
$changeStream = $operation->execute($this->getPrimaryServer());
989+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
982990

983991
$changeStream->rewind();
984992
$this->assertTrue($changeStream->valid());
@@ -1193,6 +1201,187 @@ public function testSessionFreed()
11931201
$this->assertNull($rp->getValue($changeStream));
11941202
}
11951203

1204+
/**
1205+
* Prose test: "ChangeStream will automatically resume one time on a
1206+
* resumable error (including not master) with the initial pipeline and
1207+
* options, except for the addition/update of a resumeToken."
1208+
*/
1209+
public function testResumeRepeatsOriginalPipelineAndOptions()
1210+
{
1211+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1212+
1213+
$aggregateCommands = [];
1214+
1215+
$this->configureFailPoint([
1216+
'configureFailPoint' => 'failCommand',
1217+
'mode' => ['times' => 1],
1218+
'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER],
1219+
]);
1220+
1221+
(new CommandObserver)->observe(
1222+
function() use ($operation) {
1223+
$changeStream = $operation->execute($this->getPrimaryServer());
1224+
1225+
// The first next will hit the fail point, causing a resume
1226+
$changeStream->next();
1227+
$changeStream->next();
1228+
},
1229+
function(array $event) use (&$aggregateCommands) {
1230+
$command = $event['started']->getCommand();
1231+
if ($event['started']->getCommandName() !== 'aggregate') {
1232+
return;
1233+
}
1234+
1235+
$aggregateCommands[] = (array) $command;
1236+
}
1237+
);
1238+
1239+
$this->assertCount(2, $aggregateCommands);
1240+
1241+
$this->assertThat(
1242+
$aggregateCommands[0]['pipeline'][0]->{'$changeStream'},
1243+
$this->logicalNot(
1244+
$this->logicalOr(
1245+
$this->objectHasAttribute('resumeAfter'),
1246+
$this->objectHasAttribute('startAfter'),
1247+
$this->objectHasAttribute('startAtOperationTime')
1248+
)
1249+
)
1250+
);
1251+
1252+
$this->assertThat(
1253+
$aggregateCommands[1]['pipeline'][0]->{'$changeStream'},
1254+
$this->logicalOr(
1255+
$this->objectHasAttribute('resumeAfter'),
1256+
$this->objectHasAttribute('startAfter'),
1257+
$this->objectHasAttribute('startAtOperationTime')
1258+
)
1259+
);
1260+
1261+
$aggregateCommands = array_map(
1262+
function (array $aggregateCommand) {
1263+
// Remove resume options from the changestream document
1264+
if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) {
1265+
$aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key(
1266+
(array) $aggregateCommand['pipeline'][0]->{'$changeStream'},
1267+
['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false]
1268+
);
1269+
}
1270+
1271+
// Remove options we don't want to compare between commands
1272+
return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]);
1273+
},
1274+
$aggregateCommands
1275+
);
1276+
1277+
// Ensure options in original and resuming aggregate command match
1278+
$this->assertEquals($aggregateCommands[0], $aggregateCommands[1]);
1279+
}
1280+
1281+
/**
1282+
* Prose test: "ChangeStream will not attempt to resume on any error
1283+
* encountered while executing an aggregate command."
1284+
*/
1285+
public function testErrorDuringAggregateCommandDoesNotCauseResume()
1286+
{
1287+
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
1288+
$this->markTestSkipped('failCommand is not supported');
1289+
}
1290+
1291+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1292+
1293+
$commandCount = 0;
1294+
1295+
$this->configureFailPoint([
1296+
'configureFailPoint' => 'failCommand',
1297+
'mode' => ['times' => 1],
1298+
'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::NOT_MASTER],
1299+
]);
1300+
1301+
$this->expectException(CommandException::class);
1302+
1303+
(new CommandObserver)->observe(
1304+
function() use ($operation) {
1305+
$operation->execute($this->getPrimaryServer());
1306+
},
1307+
function(array $event) use (&$commandCount) {
1308+
$commandCount++;
1309+
}
1310+
);
1311+
1312+
$this->assertSame(1, $commandCount);
1313+
}
1314+
1315+
/**
1316+
* Prose test: "ChangeStream will perform server selection before attempting
1317+
* to resume, using initial readPreference"
1318+
*/
1319+
public function testOriginalReadPreferenceIsPreservedOnResume()
1320+
{
1321+
$readPreference = new ReadPreference('secondary');
1322+
$options = ['readPreference' => $readPreference] + $this->defaultOptions;
1323+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1324+
1325+
try {
1326+
$secondary = $this->manager->selectServer($readPreference);
1327+
} catch (ConnectionTimeoutException $e) {
1328+
$this->markTestSkipped('Secondary is not available');
1329+
}
1330+
1331+
$changeStream = $operation->execute($secondary);
1332+
$previousCursorId = $changeStream->getCursorId();
1333+
$this->killChangeStreamCursor($changeStream);
1334+
1335+
$changeStream->next();
1336+
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());
1337+
1338+
$getCursor = Closure::bind(
1339+
function () {
1340+
return $this->iterator->getInnerIterator();
1341+
},
1342+
$changeStream,
1343+
ChangeStream::class
1344+
);
1345+
/** @var Cursor $cursor */
1346+
$cursor = $getCursor();
1347+
self::assertTrue($cursor->getServer()->isSecondary());
1348+
}
1349+
1350+
/**
1351+
* Prose test
1352+
* For a ChangeStream under these conditions:
1353+
* - Running against a server <4.0.7.
1354+
* - The batch is empty or has been iterated to the last document.
1355+
* Expected result:
1356+
* - getResumeToken must return the _id of the last document returned if one exists.
1357+
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
1358+
* - If resumeAfter was not specified, the getResumeToken result must be empty.
1359+
*/
1360+
public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
1361+
{
1362+
if ($this->isPostBatchResumeTokenSupported()) {
1363+
$this->markTestSkipped('postBatchResumeToken is supported');
1364+
}
1365+
1366+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1367+
$changeStream = $operation->execute($this->getPrimaryServer());
1368+
1369+
$this->assertNull($changeStream->getResumeToken());
1370+
1371+
$this->insertDocument(['x' => 1]);
1372+
1373+
$changeStream->next();
1374+
$this->assertTrue($changeStream->valid());
1375+
$resumeToken = $changeStream->getResumeToken();
1376+
$this->assertSame($resumeToken, $changeStream->current()->_id);
1377+
1378+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
1379+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1380+
$changeStream = $operation->execute($this->getPrimaryServer());
1381+
1382+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
1383+
}
1384+
11961385
private function assertNoCommandExecuted(callable $callable)
11971386
{
11981387
$commands = [];

0 commit comments

Comments
 (0)