Skip to content

PHPLIB-407: Support startAfter change stream option #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/includes/apiargs-MongoDBClient-method-watch-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ source:
file: apiargs-common-option.yaml
ref: session
---
source:
file: apiargs-method-watch-option.yaml
ref: startAfter
post: |
.. versionadded: 1.5
---
source:
file: apiargs-method-watch-option.yaml
ref: startAtOperationTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ source:
file: apiargs-common-option.yaml
ref: session
---
source:
file: apiargs-method-watch-option.yaml
ref: startAfter
post: |
.. versionadded: 1.5
---
source:
file: apiargs-method-watch-option.yaml
ref: startAtOperationTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ source:
file: apiargs-common-option.yaml
ref: session
---
source:
file: apiargs-method-watch-option.yaml
ref: startAfter
post: |
.. versionadded: 1.5
---
source:
file: apiargs-method-watch-option.yaml
ref: startAtOperationTime
Expand Down
32 changes: 28 additions & 4 deletions docs/includes/apiargs-method-watch-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,32 @@ description: |
Specifies the logical starting point for the new change stream. The ``_id``
field in documents returned by the change stream may be used here.

Using this option in conjunction with ``startAtOperationTime`` will result in
a server error. The options are mutually exclusive.
Using this option in conjunction with ``startAfter`` and/or
``startAtOperationTime`` will result in a server error. The options are
mutually exclusive.

.. note::

This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: startAfter
type: array|object
description: |
Specifies the logical starting point for the new change stream. The ``_id``
field in documents returned by the change stream may be used here. Unlike
``resumeAfter``, this option can be used with a resume token from an
"invalidate" event.

Using this option in conjunction with ``resumeAfter`` and/or
``startAtOperationTime`` will result in a server error. The options are
mutually exclusive.

This is not supported for server versions prior to 4.2 and will result in an
exception at execution time if used.

.. note::

Expand All @@ -66,8 +90,8 @@ description: |
``operationTime`` returned by the initial ``aggregate`` command will be used
if available.

Using this option in conjunction with ``resumeAfter`` will result in a server
error. The options are mutually exclusive.
Using this option in conjunction with ``resumeAfter`` and/or ``startAfter``
will result in a server error. The options are mutually exclusive.

This is not supported for server versions prior to 4.0 and will result in an
exception at execution time if used.
Expand Down
24 changes: 19 additions & 5 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,31 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
* * resumeAfter (document): Specifies the logical starting point for the
* new change stream.
*
* Using this option in conjunction with "startAtOperationTime" will
* result in a server error. The options are mutually exclusive.
* Using this option in conjunction with "startAfter" and/or
* "startAtOperationTime" will result in a server error. The options are
* mutually exclusive.
*
* * session (MongoDB\Driver\Session): Client session.
*
* Sessions are not supported for server versions < 3.6.
*
* * startAfter (document): Specifies the logical starting point for the
* new change stream. Unlike "resumeAfter", this option can be used with
* a resume token from an "invalidate" event.
*
* Using this option in conjunction with "resumeAfter" and/or
* "startAtOperationTime" will result in a server error. The options are
* mutually exclusive.
*
* * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
* the change stream will only provide changes that occurred at or after
* the specified timestamp. Any command run against the server will
* return an operation time that can be used here. Alternatively, an
* operation time may be obtained from MongoDB\Driver\Server::getInfo().
*
* Using this option in conjunction with "resumeAfter" will result in a
* server error. The options are mutually exclusive.
* Using this option in conjunction with "resumeAfter" and/or
* "startAfter" will result in a server error. The options are mutually
* exclusive.
*
* This option is not supported for server versions < 4.0.
*
Expand Down Expand Up @@ -143,6 +153,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
}

if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
}

if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
}
Expand All @@ -162,7 +176,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
}

$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAtOperationTime' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);

// Null database name implies a cluster-wide change stream
if ($databaseName === null) {
Expand Down
72 changes: 72 additions & 0 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,78 @@ public function testRewindExtractsResumeTokenAndNextResumes()
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

public function testResumeAfterOption()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertFalse($changeStream->valid());

$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);

$changeStream->next();
$this->assertTrue($changeStream->valid());

$resumeToken = $changeStream->current()->_id;

$options = $this->defaultOptions + ['resumeAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertTrue($changeStream->valid());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

public function testStartAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('startAfter is not supported');
}

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertFalse($changeStream->valid());

$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);

$changeStream->next();
$this->assertTrue($changeStream->valid());

$resumeToken = $changeStream->current()->_id;

$options = $this->defaultOptions + ['startAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertTrue($changeStream->valid());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

/**
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
*/
Expand Down