Skip to content

Commit 21e1f30

Browse files
committed
PHPLIB-351: Cluster and DB-level change streams and startAtOperationTime
1 parent 5327df0 commit 21e1f30

File tree

5 files changed

+308
-31
lines changed

5 files changed

+308
-31
lines changed

src/Client.php

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
use MongoDB\Model\DatabaseInfoIterator;
3030
use MongoDB\Operation\DropDatabase;
3131
use MongoDB\Operation\ListDatabases;
32+
use MongoDB\Operation\Watch;
3233

3334
class Client
3435
{
@@ -37,9 +38,12 @@ class Client
3738
'document' => 'MongoDB\Model\BSONDocument',
3839
'root' => 'MongoDB\Model\BSONDocument',
3940
];
41+
private static $wireVersionForReadConcern = 4;
4042
private static $wireVersionForWritableCommandWriteConcern = 5;
4143

4244
private $manager;
45+
private $readConcern;
46+
private $readPreference;
4347
private $uri;
4448
private $typeMap;
4549
private $writeConcern;
@@ -81,6 +85,8 @@ public function __construct($uri = 'mongodb://127.0.0.1/', array $uriOptions = [
8185
unset($driverOptions['typeMap']);
8286

8387
$this->manager = new Manager($uri, $uriOptions, $driverOptions);
88+
$this->readConcern = $this->manager->getReadConcern();
89+
$this->readPreference = $this->manager->getReadPreference();
8490
$this->writeConcern = $this->manager->getWriteConcern();
8591
}
8692

@@ -173,7 +179,7 @@ public function getManager()
173179
*/
174180
public function getReadConcern()
175181
{
176-
return $this->manager->getReadConcern();
182+
return $this->readConcern;
177183
}
178184

179185
/**
@@ -183,7 +189,7 @@ public function getReadConcern()
183189
*/
184190
public function getReadPreference()
185191
{
186-
return $this->manager->getReadPreference();
192+
return $this->readPreference;
187193
}
188194

189195
/**
@@ -268,4 +274,34 @@ public function startSession(array $options = [])
268274
{
269275
return $this->manager->startSession($options);
270276
}
277+
278+
/**
279+
* Create a change stream for watching changes to the cluster.
280+
*
281+
* @see Watch::__construct() for supported options
282+
* @param array $pipeline List of pipeline operations
283+
* @param array $options Command options
284+
* @return ChangeStream
285+
* @throws InvalidArgumentException for parameter/option parsing errors
286+
*/
287+
public function watch(array $pipeline = [], array $options = [])
288+
{
289+
if ( ! isset($options['readPreference'])) {
290+
$options['readPreference'] = $this->readPreference;
291+
}
292+
293+
$server = $this->manager->selectServer($options['readPreference']);
294+
295+
if ( ! isset($options['readConcern']) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern)) {
296+
$options['readConcern'] = $this->readConcern;
297+
}
298+
299+
if ( ! isset($options['typeMap'])) {
300+
$options['typeMap'] = $this->typeMap;
301+
}
302+
303+
$operation = new Watch($this->manager, null, null, $pipeline, $options);
304+
305+
return $operation->execute($server);
306+
}
271307
}

src/Database.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use MongoDB\Operation\DropDatabase;
3535
use MongoDB\Operation\ListCollections;
3636
use MongoDB\Operation\ModifyCollection;
37+
use MongoDB\Operation\Watch;
3738

3839
class Database
3940
{
@@ -42,6 +43,7 @@ class Database
4243
'document' => 'MongoDB\Model\BSONDocument',
4344
'root' => 'MongoDB\Model\BSONDocument',
4445
];
46+
private static $wireVersionForReadConcern = 4;
4547
private static $wireVersionForWritableCommandWriteConcern = 5;
4648

4749
private $databaseName;
@@ -409,6 +411,36 @@ public function selectGridFSBucket(array $options = [])
409411
return new Bucket($this->manager, $this->databaseName, $options);
410412
}
411413

414+
/**
415+
* Create a change stream for watching changes to the database.
416+
*
417+
* @see Watch::__construct() for supported options
418+
* @param array $pipeline List of pipeline operations
419+
* @param array $options Command options
420+
* @return ChangeStream
421+
* @throws InvalidArgumentException for parameter/option parsing errors
422+
*/
423+
public function watch(array $pipeline = [], array $options = [])
424+
{
425+
if ( ! isset($options['readPreference'])) {
426+
$options['readPreference'] = $this->readPreference;
427+
}
428+
429+
$server = $this->manager->selectServer($options['readPreference']);
430+
431+
if ( ! isset($options['readConcern']) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern)) {
432+
$options['readConcern'] = $this->readConcern;
433+
}
434+
435+
if ( ! isset($options['typeMap'])) {
436+
$options['typeMap'] = $this->typeMap;
437+
}
438+
439+
$operation = new Watch($this->manager, $this->databaseName, null, $pipeline, $options);
440+
441+
return $operation->execute($server);
442+
}
443+
412444
/**
413445
* Get a clone of this database with different options.
414446
*

src/Operation/Watch.php

Lines changed: 125 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,47 @@
1818
namespace MongoDB\Operation;
1919

2020
use MongoDB\ChangeStream;
21+
use MongoDB\BSON\TimestampInterface;
2122
use MongoDB\Driver\Command;
23+
use MongoDB\Driver\Cursor;
2224
use MongoDB\Driver\Manager;
2325
use MongoDB\Driver\ReadConcern;
2426
use MongoDB\Driver\ReadPreference;
2527
use MongoDB\Driver\Server;
2628
use MongoDB\Driver\Session;
2729
use MongoDB\Driver\Exception\RuntimeException;
30+
use MongoDB\Driver\Monitoring\CommandFailedEvent;
31+
use MongoDB\Driver\Monitoring\CommandSubscriber;
32+
use MongoDB\Driver\Monitoring\CommandStartedEvent;
33+
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
2834
use MongoDB\Exception\InvalidArgumentException;
2935
use MongoDB\Exception\UnexpectedValueException;
3036
use MongoDB\Exception\UnsupportedException;
3137

3238
/**
3339
* Operation for creating a change stream with the aggregate command.
3440
*
41+
* Note: the implementation of CommandSubscriber is an internal implementation
42+
* detail and should not be considered part of the public API.
43+
*
3544
* @api
3645
* @see \MongoDB\Collection::watch()
3746
* @see https://docs.mongodb.com/manual/changeStreams/
3847
*/
39-
class Watch implements Executable
48+
class Watch implements Executable, /* @internal */ CommandSubscriber
4049
{
50+
private static $wireVersionForOperationTime = 7;
51+
4152
const FULL_DOCUMENT_DEFAULT = 'default';
4253
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
4354

4455
private $aggregate;
45-
private $databaseName;
56+
private $aggregateOptions;
57+
private $changeStreamOptions;
4658
private $collectionName;
59+
private $databaseName;
60+
private $operationTime;
4761
private $pipeline;
48-
private $options;
4962
private $resumeCallable;
5063

5164
/**
@@ -79,22 +92,44 @@ class Watch implements Executable
7992
* * resumeAfter (document): Specifies the logical starting point for the
8093
* new change stream.
8194
*
95+
* Using this option in conjunction with "startAtOperationTime" will
96+
* result in a server error. The options are mutually exclusive.
97+
*
8298
* * session (MongoDB\Driver\Session): Client session.
8399
*
84100
* Sessions are not supported for server versions < 3.6.
85101
*
102+
* * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
103+
* the change stream will only provide changes that occurred at or after
104+
* the specified timestamp. Any command run against the server will
105+
* return an operation time that can be used here. Alternatively, an
106+
* operation time may be obtained from MongoDB\Driver\Server::getInfo().
107+
*
108+
* Using this option in conjunction with "resumeAfter" will result in a
109+
* server error. The options are mutually exclusive.
110+
*
111+
* This option is not supported for server versions < 4.0.
112+
*
86113
* * typeMap (array): Type map for BSON deserialization. This will be
87114
* applied to the returned Cursor (it is not sent to the server).
88115
*
89-
* @param string $databaseName Database name
90-
* @param string $collectionName Collection name
116+
* Note: A database-level change stream may be created by specifying null
117+
* for the collection name. A cluster-level change stream may be created by
118+
* specifying null for both the database and collection name.
119+
*
120+
* @param Manager $manager Manager instance from the driver
121+
* @param string|null $databaseName Database name
122+
* @param string|null $collectionName Collection name
91123
* @param array $pipeline List of pipeline operations
92124
* @param array $options Command options
93-
* @param Manager $manager Manager instance from the driver
94125
* @throws InvalidArgumentException for parameter/option parsing errors
95126
*/
96127
public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
97128
{
129+
if (isset($collectionName) && ! isset($databaseName)) {
130+
throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
131+
}
132+
98133
$options += [
99134
'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
100135
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
@@ -104,10 +139,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
104139
throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
105140
}
106141

107-
if (isset($options['resumeAfter'])) {
108-
if ( ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
109-
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
110-
}
142+
if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
143+
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
144+
}
145+
146+
if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
147+
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
111148
}
112149

113150
/* In the absence of an explicit session, create one to ensure that the
@@ -121,15 +158,47 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
121158
} catch (RuntimeException $e) {}
122159
}
123160

161+
$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
162+
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAtOperationTime' => 1]);
163+
164+
// Null database name implies a cluster-wide change stream
165+
if ($databaseName === null) {
166+
$databaseName = 'admin';
167+
$this->changeStreamOptions['allChangesForCluster'] = true;
168+
}
169+
124170
$this->databaseName = (string) $databaseName;
125-
$this->collectionName = (string) $collectionName;
171+
$this->collectionName = isset($collectionName) ? (string) $collectionName : null;
126172
$this->pipeline = $pipeline;
127-
$this->options = $options;
128173

129174
$this->aggregate = $this->createAggregate();
130175
$this->resumeCallable = $this->createResumeCallable($manager);
131176
}
132177

178+
/** @internal */
179+
final public function commandFailed(CommandFailedEvent $event)
180+
{
181+
}
182+
183+
/** @internal */
184+
final public function commandStarted(CommandStartedEvent $event)
185+
{
186+
}
187+
188+
/** @internal */
189+
final public function commandSucceeded(CommandSucceededEvent $event)
190+
{
191+
if ($event->getCommandName() !== 'aggregate') {
192+
return;
193+
}
194+
195+
$reply = $event->getReply();
196+
197+
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
198+
$this->operationTime = $reply->operationTime;
199+
}
200+
}
201+
133202
/**
134203
* Execute the operation.
135204
*
@@ -141,47 +210,74 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
141210
*/
142211
public function execute(Server $server)
143212
{
144-
$cursor = $this->aggregate->execute($server);
145-
146-
return new ChangeStream($cursor, $this->resumeCallable);
213+
return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
147214
}
148215

149216
/**
150217
* Create the aggregate command for creating a change stream.
151218
*
152-
* This method is also used to recreate the aggregate command if a new
153-
* resume token is provided while resuming.
219+
* This method is also used to recreate the aggregate command when resuming.
154220
*
155221
* @return Aggregate
156222
*/
157223
private function createAggregate()
158224
{
159-
$changeStreamOptions = array_intersect_key($this->options, ['fullDocument' => 1, 'resumeAfter' => 1]);
160-
$changeStream = ['$changeStream' => (object) $changeStreamOptions];
161-
162225
$pipeline = $this->pipeline;
163-
array_unshift($pipeline, $changeStream);
226+
array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
164227

165-
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
166-
167-
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions);
228+
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
168229
}
169230

170231
private function createResumeCallable(Manager $manager)
171232
{
172233
return function($resumeToken = null) use ($manager) {
173-
/* If a resume token was provided, recreate the Aggregate operation
174-
* using the new resume token. */
234+
/* If a resume token was provided, update the "resumeAfter" option
235+
* and ensure that "startAtOperationTime" is no longer set. */
175236
if ($resumeToken !== null) {
176-
$this->options['resumeAfter'] = $resumeToken;
177-
$this->aggregate = $this->createAggregate();
237+
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
238+
unset($this->changeStreamOptions['startAtOperationTime']);
239+
}
240+
241+
/* If we captured an operation time from the first aggregate command
242+
* and there is no "resumeAfter" option, set "startAtOperationTime"
243+
* so that we can resume from the original aggregate's time. */
244+
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
245+
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
178246
}
179247

248+
$this->aggregate = $this->createAggregate();
249+
180250
/* Select a new server using the read preference, execute this
181251
* operation on it, and return the new ChangeStream. */
182-
$server = $manager->selectServer($this->options['readPreference']);
252+
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
183253

184254
return $this->execute($server);
185255
};
186256
}
257+
258+
/**
259+
* Execute the aggregate command and optionally capture its operation time.
260+
*
261+
* @param Server $server
262+
* @return Cursor
263+
*/
264+
private function executeAggregate(Server $server)
265+
{
266+
/* If we've already captured an operation time or the server does not
267+
* support returning an operation time (e.g. MongoDB 3.6), execute the
268+
* aggregation directly and return its cursor. */
269+
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForOperationTime)) {
270+
return $this->aggregate->execute($server);
271+
}
272+
273+
/* Otherwise, execute the aggregation using command monitoring so that
274+
* we can capture its operation time with commandSucceeded(). */
275+
\MongoDB\Driver\Monitoring\addSubscriber($this);
276+
277+
try {
278+
return $this->aggregate->execute($server);
279+
} finally {
280+
\MongoDB\Driver\Monitoring\removeSubscriber($this);
281+
}
282+
}
187283
}

0 commit comments

Comments
 (0)