Skip to content

Commit 80cee6f

Browse files
committed
Merge pull request #637
2 parents 4fc1f90 + d49ab3a commit 80cee6f

6 files changed

+137
-9
lines changed

docs/includes/apiargs-MongoDBClient-method-watch-option.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ source:
3434
file: apiargs-common-option.yaml
3535
ref: session
3636
---
37+
source:
38+
file: apiargs-method-watch-option.yaml
39+
ref: startAfter
40+
post: |
41+
.. versionadded: 1.5
42+
---
3743
source:
3844
file: apiargs-method-watch-option.yaml
3945
ref: startAtOperationTime

docs/includes/apiargs-MongoDBCollection-method-watch-option.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ source:
3434
file: apiargs-common-option.yaml
3535
ref: session
3636
---
37+
source:
38+
file: apiargs-method-watch-option.yaml
39+
ref: startAfter
40+
post: |
41+
.. versionadded: 1.5
42+
---
3743
source:
3844
file: apiargs-method-watch-option.yaml
3945
ref: startAtOperationTime

docs/includes/apiargs-MongoDBDatabase-method-watch-option.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ source:
3434
file: apiargs-common-option.yaml
3535
ref: session
3636
---
37+
source:
38+
file: apiargs-method-watch-option.yaml
39+
ref: startAfter
40+
post: |
41+
.. versionadded: 1.5
42+
---
3743
source:
3844
file: apiargs-method-watch-option.yaml
3945
ref: startAtOperationTime

docs/includes/apiargs-method-watch-option.yaml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,32 @@ description: |
4646
Specifies the logical starting point for the new change stream. The ``_id``
4747
field in documents returned by the change stream may be used here.
4848
49-
Using this option in conjunction with ``startAtOperationTime`` will result in
50-
a server error. The options are mutually exclusive.
49+
Using this option in conjunction with ``startAfter`` and/or
50+
``startAtOperationTime`` will result in a server error. The options are
51+
mutually exclusive.
52+
53+
.. note::
54+
55+
This is an option of the ``$changeStream`` pipeline stage.
56+
interface: phpmethod
57+
operation: ~
58+
optional: true
59+
---
60+
arg_name: option
61+
name: startAfter
62+
type: array|object
63+
description: |
64+
Specifies the logical starting point for the new change stream. The ``_id``
65+
field in documents returned by the change stream may be used here. Unlike
66+
``resumeAfter``, this option can be used with a resume token from an
67+
"invalidate" event.
68+
69+
Using this option in conjunction with ``resumeAfter`` and/or
70+
``startAtOperationTime`` will result in a server error. The options are
71+
mutually exclusive.
72+
73+
This is not supported for server versions prior to 4.2 and will result in an
74+
exception at execution time if used.
5175
5276
.. note::
5377
@@ -66,8 +90,8 @@ description: |
6690
``operationTime`` returned by the initial ``aggregate`` command will be used
6791
if available.
6892
69-
Using this option in conjunction with ``resumeAfter`` will result in a server
70-
error. The options are mutually exclusive.
93+
Using this option in conjunction with ``resumeAfter`` and/or ``startAfter``
94+
will result in a server error. The options are mutually exclusive.
7195
7296
This is not supported for server versions prior to 4.0 and will result in an
7397
exception at execution time if used.

src/Operation/Watch.php

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,31 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
9292
* * resumeAfter (document): Specifies the logical starting point for the
9393
* new change stream.
9494
*
95-
* Using this option in conjunction with "startAtOperationTime" will
96-
* result in a server error. The options are mutually exclusive.
95+
* Using this option in conjunction with "startAfter" and/or
96+
* "startAtOperationTime" will result in a server error. The options are
97+
* mutually exclusive.
9798
*
9899
* * session (MongoDB\Driver\Session): Client session.
99100
*
100101
* Sessions are not supported for server versions < 3.6.
101102
*
103+
* * startAfter (document): Specifies the logical starting point for the
104+
* new change stream. Unlike "resumeAfter", this option can be used with
105+
* a resume token from an "invalidate" event.
106+
*
107+
* Using this option in conjunction with "resumeAfter" and/or
108+
* "startAtOperationTime" will result in a server error. The options are
109+
* mutually exclusive.
110+
*
102111
* * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
103112
* the change stream will only provide changes that occurred at or after
104113
* the specified timestamp. Any command run against the server will
105114
* return an operation time that can be used here. Alternatively, an
106115
* operation time may be obtained from MongoDB\Driver\Server::getInfo().
107116
*
108-
* Using this option in conjunction with "resumeAfter" will result in a
109-
* server error. The options are mutually exclusive.
117+
* Using this option in conjunction with "resumeAfter" and/or
118+
* "startAfter" will result in a server error. The options are mutually
119+
* exclusive.
110120
*
111121
* This option is not supported for server versions < 4.0.
112122
*
@@ -143,6 +153,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
143153
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
144154
}
145155

156+
if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
157+
throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
158+
}
159+
146160
if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
147161
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
148162
}
@@ -162,7 +176,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
162176
}
163177

164178
$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
165-
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAtOperationTime' => 1]);
179+
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
166180

167181
// Null database name implies a cluster-wide change stream
168182
if ($databaseName === null) {

tests/Operation/WatchFunctionalTest.php

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,78 @@ public function testRewindExtractsResumeTokenAndNextResumes()
759759
$this->assertMatchesDocument($expectedResult, $changeStream->current());
760760
}
761761

762+
public function testResumeAfterOption()
763+
{
764+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
765+
$changeStream = $operation->execute($this->getPrimaryServer());
766+
767+
$changeStream->rewind();
768+
$this->assertFalse($changeStream->valid());
769+
770+
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
771+
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
772+
773+
$changeStream->next();
774+
$this->assertTrue($changeStream->valid());
775+
776+
$resumeToken = $changeStream->current()->_id;
777+
778+
$options = $this->defaultOptions + ['resumeAfter' => $resumeToken];
779+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
780+
$changeStream = $operation->execute($this->getPrimaryServer());
781+
782+
$changeStream->rewind();
783+
$this->assertTrue($changeStream->valid());
784+
785+
$expectedResult = [
786+
'_id' => $changeStream->current()->_id,
787+
'operationType' => 'insert',
788+
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
789+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
790+
'documentKey' => ['_id' => 2],
791+
];
792+
793+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
794+
}
795+
796+
public function testStartAfterOption()
797+
{
798+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
799+
$this->markTestSkipped('startAfter is not supported');
800+
}
801+
802+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
803+
$changeStream = $operation->execute($this->getPrimaryServer());
804+
805+
$changeStream->rewind();
806+
$this->assertFalse($changeStream->valid());
807+
808+
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
809+
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
810+
811+
$changeStream->next();
812+
$this->assertTrue($changeStream->valid());
813+
814+
$resumeToken = $changeStream->current()->_id;
815+
816+
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
817+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
818+
$changeStream = $operation->execute($this->getPrimaryServer());
819+
820+
$changeStream->rewind();
821+
$this->assertTrue($changeStream->valid());
822+
823+
$expectedResult = [
824+
'_id' => $changeStream->current()->_id,
825+
'operationType' => 'insert',
826+
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
827+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
828+
'documentKey' => ['_id' => 2],
829+
];
830+
831+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
832+
}
833+
762834
/**
763835
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
764836
*/

0 commit comments

Comments
 (0)