Skip to content

PHPLIB-457: Audit and implement any missing change stream spec prose tests #662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private function onIteration($incrementKey)
*/
private function resume()
{
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
$this->iterator->rewind();

$this->onIteration($this->hasAdvanced);
Expand Down
9 changes: 6 additions & 3 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public function execute(Server $server)
{
return new ChangeStream(
$this->createChangeStreamIterator($server),
function($resumeToken) { return $this->resume($resumeToken); }
function($resumeToken, $hasAdvanced) { return $this->resume($resumeToken, $hasAdvanced); }
);
}

Expand Down Expand Up @@ -333,10 +333,11 @@ private function getInitialResumeToken()
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
* @param array|object|null $resumeToken
* @param bool $hasAdvanced
* @return ChangeStreamIterator
* @throws InvalidArgumentException
*/
private function resume($resumeToken = null)
private function resume($resumeToken = null, $hasAdvanced = false)
{
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
Expand All @@ -347,12 +348,14 @@ private function resume($resumeToken = null)
// Select a new server using the original read preference
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);

$resumeOption = isset($this->changeStreamOptions['startAfter']) && !$hasAdvanced ? 'startAfter' : 'resumeAfter';

unset($this->changeStreamOptions['resumeAfter']);
unset($this->changeStreamOptions['startAfter']);
unset($this->changeStreamOptions['startAtOperationTime']);

if ($resumeToken !== null) {
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
$this->changeStreamOptions[$resumeOption] = $resumeToken;
}

if ($resumeToken === null && $this->operationTime !== null) {
Expand Down
290 changes: 289 additions & 1 deletion tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

namespace MongoDB\Tests\Operation;

use Closure;
use MongoDB\ChangeStream;
use MongoDB\BSON\TimestampInterface;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\CommandException;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
Expand All @@ -25,6 +28,8 @@ class WatchFunctionalTest extends FunctionalTestCase
{
use SetUpTearDownTrait;

const NOT_MASTER = 10107;

private static $wireVersionForStartAtOperationTime = 7;

private $defaultOptions = ['maxAwaitTimeMS' => 500];
Expand Down Expand Up @@ -890,9 +895,11 @@ public function testRewindExtractsResumeTokenAndNextResumes()
$changeStream->next();
$this->assertTrue($changeStream->valid());

$options = ['resumeAfter' => $changeStream->current()->_id] + $this->defaultOptions;
$resumeToken = $changeStream->current()->_id;
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSame($resumeToken, $changeStream->getResumeToken());

$changeStream->rewind();
$this->assertTrue($changeStream->valid());
Expand Down Expand Up @@ -979,6 +986,7 @@ public function testStartAfterOption()
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSame($resumeToken, $changeStream->getResumeToken());

$changeStream->rewind();
$this->assertTrue($changeStream->valid());
Expand Down Expand Up @@ -1193,6 +1201,286 @@ public function testSessionFreed()
$this->assertNull($rp->getValue($changeStream));
}

/**
* Prose test: "ChangeStream will automatically resume one time on a
* resumable error (including not master) with the initial pipeline and
* options, except for the addition/update of a resumeToken."
*/
public function testResumeRepeatsOriginalPipelineAndOptions()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

$aggregateCommands = [];

$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER],
]);

(new CommandObserver)->observe(
function() use ($operation) {
$changeStream = $operation->execute($this->getPrimaryServer());

// The first next will hit the fail point, causing a resume
$changeStream->next();
$changeStream->next();
},
function(array $event) use (&$aggregateCommands) {
$command = $event['started']->getCommand();
if ($event['started']->getCommandName() !== 'aggregate') {
return;
}

$aggregateCommands[] = (array) $command;
}
);

$this->assertCount(2, $aggregateCommands);

$this->assertThat(
$aggregateCommands[0]['pipeline'][0]->{'$changeStream'},
$this->logicalNot(
$this->logicalOr(
$this->objectHasAttribute('resumeAfter'),
$this->objectHasAttribute('startAfter'),
$this->objectHasAttribute('startAtOperationTime')
)
)
);

$this->assertThat(
$aggregateCommands[1]['pipeline'][0]->{'$changeStream'},
$this->logicalOr(
$this->objectHasAttribute('resumeAfter'),
$this->objectHasAttribute('startAfter'),
$this->objectHasAttribute('startAtOperationTime')
)
);

$aggregateCommands = array_map(
function (array $aggregateCommand) {
// Remove resume options from the changestream document
if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) {
$aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key(
(array) $aggregateCommand['pipeline'][0]->{'$changeStream'},
['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false]
);
}

// Remove options we don't want to compare between commands
return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]);
},
$aggregateCommands
);

// Ensure options in original and resuming aggregate command match
$this->assertEquals($aggregateCommands[0], $aggregateCommands[1]);
}

/**
* Prose test: "ChangeStream will not attempt to resume on any error
* encountered while executing an aggregate command."
*/
public function testErrorDuringAggregateCommandDoesNotCauseResume()
{
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
$this->markTestSkipped('failCommand is not supported');
}

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

$commandCount = 0;

$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::NOT_MASTER],
]);

$this->expectException(CommandException::class);

(new CommandObserver)->observe(
function() use ($operation) {
$operation->execute($this->getPrimaryServer());
},
function(array $event) use (&$commandCount) {
$commandCount++;
}
);

$this->assertSame(1, $commandCount);
}

/**
* Prose test: "ChangeStream will perform server selection before attempting
* to resume, using initial readPreference"
*/
public function testOriginalReadPreferenceIsPreservedOnResume()
{
$readPreference = new ReadPreference('secondary');
$options = ['readPreference' => $readPreference] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);

try {
$secondary = $this->manager->selectServer($readPreference);
} catch (ConnectionTimeoutException $e) {
$this->markTestSkipped('Secondary is not available');
}

$changeStream = $operation->execute($secondary);
$previousCursorId = $changeStream->getCursorId();
$this->killChangeStreamCursor($changeStream);

$changeStream->next();
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());

$getCursor = Closure::bind(
function () {
return $this->iterator->getInnerIterator();
},
$changeStream,
ChangeStream::class
);
/** @var Cursor $cursor */
$cursor = $getCursor();
self::assertTrue($cursor->getServer()->isSecondary());
}

/**
* Prose test
* For a ChangeStream under these conditions:
* - Running against a server <4.0.7.
* - The batch is empty or has been iterated to the last document.
* Expected result:
* - getResumeToken must return the _id of the last document returned if one exists.
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
* - If resumeAfter was not specified, the getResumeToken result must be empty.
*/
public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
{
if ($this->isPostBatchResumeTokenSupported()) {
$this->markTestSkipped('postBatchResumeToken is supported');
}

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->assertNull($changeStream->getResumeToken());

$this->insertDocument(['x' => 1]);

$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();
$this->assertSame($resumeToken, $changeStream->current()->_id);

$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->assertSame($resumeToken, $changeStream->getResumeToken());
}

/**
* Prose test: "$changeStream stage for ChangeStream started with startAfter
* against a server >=4.1.1 that has not received any results yet MUST
* include a startAfter option and MUST NOT include a resumeAfter option
* when resuming a change stream."
*/
public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
}

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->insertDocument(['x' => 1]);

$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();

$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->killChangeStreamCursor($changeStream);

$aggregateCommand = null;

(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function(array $event) use (&$aggregateCommand) {
if ($event['started']->getCommandName() !== 'aggregate') {
return;
}

$aggregateCommand = $event['started']->getCommand();
}
);

$this->assertNotNull($aggregateCommand);
$this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
$this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
}

/**
* Prose test: "$changeStream stage for ChangeStream started with startAfter
* against a server >=4.1.1 that has received at least one result MUST
* include a resumeAfter option and MUST NOT include a startAfter option
* when resuming a change stream."
*/
public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
}

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->insertDocument(['x' => 1]);

$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();

$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();

$this->insertDocument(['x' => 2]);
$changeStream->next();
$this->assertTrue($changeStream->valid());

$this->killChangeStreamCursor($changeStream);

$aggregateCommand = null;

(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function(array $event) use (&$aggregateCommand) {
if ($event['started']->getCommandName() !== 'aggregate') {
return;
}

$aggregateCommand = $event['started']->getCommand();
}
);

$this->assertNotNull($aggregateCommand);
$this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
$this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
}

private function assertNoCommandExecuted(callable $callable)
{
$commands = [];
Expand Down