Skip to content

PHPLIB-814: Change stream support for point-in-time pre and post-images #911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/includes/apiargs-MongoDBClient-method-watch-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocumentBeforeChange
post: |
.. versionadded: 1.13
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocumentBeforeChange
post: |
.. versionadded: 1.13
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocumentBeforeChange
post: |
.. versionadded: 1.13
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
Expand Down
51 changes: 45 additions & 6 deletions docs/includes/apiargs-method-watch-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,53 @@ arg_name: option
name: fullDocument
type: string
description: |
Allowed values are 'default' and 'updateLookup'. Defaults to 'default'.
When set to 'updateLookup', the change notification for partial updates will
include both a delta describing the changes to the document, as well as a
copy of the entire document that was changed from some time after the change
occurred. The following values are supported:
Determines how the "fullDocument" response field will be populated for update
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aleksandr-rudo: I realized after submitting #910 (comment) that I forgot to add docs to this PR. Please take a look when you get a chance.

Per the commit message, I intentionally omitted mention of the "default" and "off" constants, since they're redundant.

operations.

By default, change streams only return the delta of fields (via an
"udateDescription" field) for update operations and "fullDocument" is omitted.
Insert and replace operations always include the "fullDocument" field. Delete
operations omit the field as the document no longer exists.

Specify "updateLookup" to return the current majority-committed version of the
updated document.

MongoDB 6.0+ allows returning the post-image of the modified document if the
collection has ``changeStreamPreAndPostImages`` enabled. Specify
"whenAvailable" to return the post-image if available or a null value if not.
Specify "required" to return the post-image if available or raise an error if
not.

The following values are supported:

- ``MongoDB\Operation\Watch::FULL_DOCUMENT_DEFAULT`` (*default*)
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE``
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED``

.. note::

This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: fullDocumentBeforeChange
type: string
description: |
Determines how the "fullDocumentBeforeChange" response field will be
populated. By default, the field is omitted.

MongoDB 6.0+ allows returning the pre-image of the modified document if the
collection has ``changeStreamPreAndPostImages`` enabled. Specify
"whenAvailable" to return the pre-image if available or a null value if not.
Specify "required" to return the pre-image if available or raise an error if
not.

The following values are supported:

- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE``
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED``

.. note::

Expand Down
46 changes: 37 additions & 9 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
{
public const FULL_DOCUMENT_DEFAULT = 'default';
public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable';
public const FULL_DOCUMENT_REQUIRED = 'required';

public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off';
Copy link
Member Author

@jmikola jmikola Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I don't expect applications to use this constant (doing so would be equivalent to leaving the option unspecified), and therefore did not document it; however, I still wanted to define the constant since the server does accept it and we use the value in corresponding spec tests.

public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable';
public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required';

/** @var integer */
private static $wireVersionForStartAtOperationTime = 7;
Expand Down Expand Up @@ -105,15 +111,33 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
*
* * collation (document): Specifies a collation.
*
* * fullDocument (string): Determines whether the "fullDocument" field
* will be populated for update operations. By default, change streams
* only return the delta of fields during the update operation (via the
* "updateDescription" field). To additionally return the most current
* majority-committed version of the updated document, specify
* "updateLookup" for this option. Defaults to "default".
* * fullDocument (string): Determines how the "fullDocument" response
* field will be populated for update operations.
*
* By default, change streams only return the delta of fields (via an
* "updateDescription" field) for update operations and "fullDocument" is
* omitted. Insert and replace operations always include the
* "fullDocument" field. Delete operations omit the field as the document
* no longer exists.
*
* Specify "updateLookup" to return the current majority-committed
* version of the updated document.
*
* MongoDB 6.0+ allows returning the post-image of the modified document
* if the collection has changeStreamPreAndPostImages enabled. Specify
* "whenAvailable" to return the post-image if available or a null value
* if not. Specify "required" to return the post-image if available or
* raise an error if not.
*
* Insert and replace operations always include the "fullDocument" field
* and delete operations omit the field as the document no longer exists.
* * fullDocumentBeforeChange (string): Determines how the
* "fullDocumentBeforeChange" response field will be populated. By
* default, the field is omitted.
*
* MongoDB 6.0+ allows returning the pre-image of the modified document
* if the collection has changeStreamPreAndPostImages enabled. Specify
* "whenAvailable" to return the pre-image if available or a null value
* if not. Specify "required" to return the pre-image if available or
* raise an error if not.
*
* * maxAwaitTimeMS (integer): The maximum amount of time for the server to
* wait on new documents to satisfy a change stream query.
Expand Down Expand Up @@ -181,6 +205,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
}

if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) {
throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string');
}

if (! $options['readPreference'] instanceof ReadPreference) {
throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
}
Expand Down Expand Up @@ -212,7 +240,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
}

$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);

// Null database name implies a cluster-wide change stream
if ($databaseName === null) {
Expand Down
4 changes: 4 additions & 0 deletions tests/Operation/WatchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public function provideInvalidConstructorOptions()
$options[][] = ['fullDocument' => $value];
}

foreach ($this->getInvalidStringValues() as $value) {
$options[][] = ['fullDocumentBeforeChange' => $value];
}

foreach ($this->getInvalidIntegerValues() as $value) {
$options[][] = ['maxAwaitTimeMS' => $value];
}
Expand Down
2 changes: 1 addition & 1 deletion tests/UnifiedSpecTests/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ final class Util
Collection::class => [
'aggregate' => ['pipeline', 'session', 'useCursor', 'allowDiskUse', 'batchSize', 'bypassDocumentValidation', 'collation', 'comment', 'explain', 'hint', 'let', 'maxAwaitTimeMS', 'maxTimeMS'],
'bulkWrite' => ['let', 'requests', 'session', 'ordered', 'bypassDocumentValidation'],
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'fullDocumentBeforeChange', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
'createFindCursor' => ['filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'limit', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'],
'createIndex' => ['keys', 'commitQuorum', 'maxTimeMS', 'name', 'session'],
'dropIndex' => ['name', 'session', 'maxTimeMS'],
Expand Down
Loading