Skip to content

PHPLIB-438: Unify handling for write stages in aggregation pipelines #644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ source:
source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: readPreference
post: |
This option will be ignored when using the :ref:`$out <agg-out>` stage.
---
source:
file: apiargs-common-option.yaml
Expand Down Expand Up @@ -72,7 +70,8 @@ source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: writeConcern
post: |
This only applies when the :ref:`$out <agg-out>` stage is specified.
This only applies when a :ref:`$out <agg-out>` or :ref:`$merge <agg-merge>`
stage is specified.

This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ source:
source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: readPreference
post: |
This option will be ignored when using the :ref:`$out <agg-out>` stage.
---
source:
file: apiargs-common-option.yaml
Expand All @@ -48,7 +46,8 @@ source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: writeConcern
post: |
This only applies when the :ref:`$out <agg-out>` stage is specified.
This only applies when a :ref:`$out <agg-out>` or :ref:`$merge <agg-merge>`
stage is specified.

This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used.
Expand Down
3 changes: 2 additions & 1 deletion docs/includes/apiargs-aggregate-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: bypassDocumentValidation
post: |
This only applies when using the :ref:`$out <agg-out>` stage.
This only applies when using the :ref:`$out <agg-out>` and
:ref:`$out <agg-merge>` stages.

Document validation requires MongoDB 3.2 or later: if you are using an earlier
version of MongoDB, this option will be ignored.
Expand Down
10 changes: 5 additions & 5 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Collection
private static $wireVersionForFindAndModifyWriteConcern = 4;
private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5;
private static $wireVersionForReadConcernWithOutStage = 8;
private static $wireVersionForReadConcernWithWriteStage = 8;

private $collectionName;
private $databaseName;
Expand Down Expand Up @@ -190,13 +190,13 @@ public function __toString()
*/
public function aggregate(array $pipeline, array $options = [])
{
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($pipeline);
$hasWriteStage = \MongoDB\is_last_pipeline_operator_write($pipeline);

if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}

if ($hasOutStage) {
if ($hasWriteStage) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

Expand All @@ -210,7 +210,7 @@ public function aggregate(array $pipeline, array $options = [])
if ( ! isset($options['readConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) &&
! \MongoDB\is_in_transaction($options) &&
( ! $hasOutStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithOutStage))
( ! $hasWriteStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithWriteStage))
) {
$options['readConcern'] = $this->readConcern;
}
Expand All @@ -219,7 +219,7 @@ public function aggregate(array $pipeline, array $options = [])
$options['typeMap'] = $this->typeMap;
}

if ($hasOutStage &&
if ($hasWriteStage &&
! isset($options['writeConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) &&
! \MongoDB\is_in_transaction($options)) {
Expand Down
10 changes: 5 additions & 5 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Database
];
private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5;
private static $wireVersionForReadConcernWithOutStage = 8;
private static $wireVersionForReadConcernWithWriteStage = 8;

private $databaseName;
private $manager;
Expand Down Expand Up @@ -175,13 +175,13 @@ public function __toString()
*/
public function aggregate(array $pipeline, array $options = [])
{
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($pipeline);
$hasWriteStage = \MongoDB\is_last_pipeline_operator_write($pipeline);

if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}

if ($hasOutStage) {
if ($hasWriteStage) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

Expand All @@ -195,7 +195,7 @@ public function aggregate(array $pipeline, array $options = [])
if ( ! isset($options['readConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) &&
! \MongoDB\is_in_transaction($options) &&
( ! $hasOutStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithOutStage))
( ! $hasWriteStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithWriteStage))
) {
$options['readConcern'] = $this->readConcern;
}
Expand All @@ -204,7 +204,7 @@ public function aggregate(array $pipeline, array $options = [])
$options['typeMap'] = $this->typeMap;
}

if ($hasOutStage &&
if ($hasWriteStage &&
! isset($options['writeConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) &&
! \MongoDB\is_in_transaction($options)) {
Expand Down
16 changes: 8 additions & 8 deletions src/Operation/Aggregate.php
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ public function execute(Server $server)


$hasExplain = ! empty($this->options['explain']);
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($this->pipeline);
$hasWriteStage = \MongoDB\is_last_pipeline_operator_write($this->pipeline);

$command = $this->createCommand($server, $hasOutStage);
$options = $this->createOptions($hasOutStage, $hasExplain);
$command = $this->createCommand($server, $hasWriteStage);
$options = $this->createOptions($hasWriteStage, $hasExplain);

$cursor = ($hasOutStage && ! $hasExplain)
$cursor = ($hasWriteStage && ! $hasExplain)
? $server->executeReadWriteCommand($this->databaseName, $command, $options)
: $server->executeReadCommand($this->databaseName, $command, $options);

Expand Down Expand Up @@ -353,27 +353,27 @@ private function createCommand(Server $server, $hasOutStage)
*
* @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
* @param boolean $hasOutStage
* @param boolean $hasWriteStage
* @param boolean $hasExplain
* @return array
*/
private function createOptions($hasOutStage, $hasExplain)
private function createOptions($hasWriteStage, $hasExplain)
{
$options = [];

if (isset($this->options['readConcern'])) {
$options['readConcern'] = $this->options['readConcern'];
}

if ( ! $hasOutStage && isset($this->options['readPreference'])) {
if (!$hasWriteStage && isset($this->options['readPreference'])) {
$options['readPreference'] = $this->options['readPreference'];
}

if (isset($this->options['session'])) {
$options['session'] = $this->options['session'];
}

if ($hasOutStage && ! $hasExplain && isset($this->options['writeConcern'])) {
if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern'];
}

Expand Down
6 changes: 3 additions & 3 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function is_in_transaction(array $options)
}

/**
* Return whether the aggregation pipeline ends with an $out operator.
* Return whether the aggregation pipeline ends with an $out or $merge operator.
*
* This is used for determining whether the aggregation pipeline must be
* executed against a primary server.
Expand All @@ -136,7 +136,7 @@ function is_in_transaction(array $options)
* @param array $pipeline List of pipeline operations
* @return boolean
*/
function is_last_pipeline_operator_out(array $pipeline)
function is_last_pipeline_operator_write(array $pipeline)
{
$lastOp = end($pipeline);

Expand All @@ -146,7 +146,7 @@ function is_last_pipeline_operator_out(array $pipeline)

$lastOp = (array) $lastOp;

return key($lastOp) === '$out';
return in_array(key($lastOp), ['$out', '$merge'], true);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion tests/Collection/CrudSpecFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private function executeAssertResult(array $operation, $expectedResult, $actualR
* the result here; however, assertEquivalentCollections() will
* assert the output collection's contents later.
*/
if ( ! \MongoDB\is_last_pipeline_operator_out($operation['arguments']['pipeline'])) {
if ( ! \MongoDB\is_last_pipeline_operator_write($operation['arguments']['pipeline'])) {
$this->assertSameDocuments($expectedResult, $actualResult);
}
break;
Expand Down
9 changes: 9 additions & 0 deletions tests/SpecTests/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public static function fromCrud(stdClass $test, $databaseName, $collectionName)
$o->outcomeCollectionName = $test->outcome->collection->name;
}

$o->defaultWriteOptions = [
'writeConcern' => new WriteConcern(WriteConcern::MAJORITY),
];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this (and outcomeFindOptions) actually needed? AFAIK, all of the CRUD tests read/write to the primary and never specified a custom read concern or read preference.

AFAIK, defaultWriteOptions was only used for transaction tests (per implementation instructions).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this after a test kept failing on a replicaset setup. I got suspicious after I couldn’t replicate the failures when step-debugging through the test and thought that the problem might be with the secondaries being read before they have the data.


$o->outcomeFindOptions = [
'readConcern' => new ReadConcern('local'),
'readPreference' => new ReadPreference('primary'),
];

$o->client = new Client(FunctionalTestCase::getUri(), $clientOptions);

return $o;
Expand Down
5 changes: 0 additions & 5 deletions tests/SpecTests/CrudSpecTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ class CrudSpecTest extends FunctionalTestCase
/* These should all pass before the driver can be considered compatible with
* MongoDB 4.2. */
private static $incompleteTests = [
'aggregate-merge: Aggregate with $merge' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and batch size of 0' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and majority readConcern' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and local readConcern' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and available readConcern' => 'PHPLIB-438',
'bulkWrite-arrayFilters: BulkWrite with arrayFilters' => 'Fails due to command assertions',
'updateWithPipelines: UpdateOne using pipelines' => 'PHPLIB-418',
'updateWithPipelines: UpdateMany using pipelines' => 'PHPLIB-418',
Expand Down
2 changes: 1 addition & 1 deletion tests/SpecTests/Operation.php
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ private function getResultAssertionTypeForCollection()
* the CRUD specification and is not implemented in the library
* since we have no concept of lazy cursors. Rely on examining
* the output collection rather than the operation result. */
if (\MongoDB\is_last_pipeline_operator_out($this->arguments['pipeline'])) {
if (\MongoDB\is_last_pipeline_operator_write($this->arguments['pipeline'])) {
return ResultExpectation::ASSERT_NOTHING;
}

Expand Down