Skip to content

PHPLIB-466: Support mongos pinning for sharded transactions #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ cache:

env:
global:
- DRIVER_VERSION=1.6.0alpha3
- DRIVER_VERSION=1.6.0rc1
- SERVER_DISTRO=ubuntu1604
- SERVER_VERSION=4.2.0
- DEPLOYMENT=STANDALONE
Expand Down
6 changes: 3 additions & 3 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public function dropDatabase($databaseName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -246,7 +246,7 @@ public function getWriteConcern()
public function listDatabases(array $options = [])
{
$operation = new ListDatabases($options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand Down Expand Up @@ -311,7 +311,7 @@ public function watch(array $pipeline = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
Expand Down
52 changes: 26 additions & 26 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down Expand Up @@ -276,7 +276,7 @@ public function bulkWrite(array $operations, array $options = [])
}

$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -301,7 +301,7 @@ public function count($filter = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
Expand Down Expand Up @@ -330,7 +330,7 @@ public function countDocuments($filter = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
Expand Down Expand Up @@ -392,7 +392,7 @@ public function createIndex($key, array $options = [])
*/
public function createIndexes(array $indexes, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -422,7 +422,7 @@ public function deleteMany($filter, array $options = [])
}

$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -446,7 +446,7 @@ public function deleteOne($filter, array $options = [])
}

$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -470,7 +470,7 @@ public function distinct($fieldName, $filter = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
Expand All @@ -497,7 +497,7 @@ public function drop(array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -531,7 +531,7 @@ public function dropIndex($indexName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand All @@ -558,7 +558,7 @@ public function dropIndexes(array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -586,7 +586,7 @@ public function estimatedDocumentCount(array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
Expand Down Expand Up @@ -619,7 +619,7 @@ public function explain(Explainable $explainable, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

$operation = new Explain($this->databaseName, $explainable, $options);

Expand All @@ -644,7 +644,7 @@ public function find($filter = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
Expand Down Expand Up @@ -677,7 +677,7 @@ public function findOne($filter = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
Expand Down Expand Up @@ -709,7 +709,7 @@ public function findOne($filter = [], array $options = [])
*/
public function findOneAndDelete($filter, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -746,7 +746,7 @@ public function findOneAndDelete($filter, array $options = [])
*/
public function findOneAndReplace($filter, $replacement, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -783,7 +783,7 @@ public function findOneAndReplace($filter, $replacement, array $options = [])
*/
public function findOneAndUpdate($filter, $update, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -899,7 +899,7 @@ public function insertMany(array $documents, array $options = [])
}

$operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -922,7 +922,7 @@ public function insertOne($document, array $options = [])
}

$operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -939,7 +939,7 @@ public function insertOne($document, array $options = [])
public function listIndexes(array $options = [])
{
$operation = new ListIndexes($this->databaseName, $this->collectionName, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand Down Expand Up @@ -972,7 +972,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce,
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

/* A "majority" read concern is not compatible with inline output, so
* avoid providing the Collection's read concern if it would conflict.
Expand Down Expand Up @@ -1016,7 +1016,7 @@ public function replaceOne($filter, $replacement, array $options = [])
}

$operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -1041,7 +1041,7 @@ public function updateMany($filter, $update, array $options = [])
}

$operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -1066,7 +1066,7 @@ public function updateOne($filter, $update, array $options = [])
}

$operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -1086,7 +1086,7 @@ public function watch(array $pipeline = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

/* Although change streams require a newer version of the server than
* read concerns, perform the usual wire version check before inheriting
Expand Down
16 changes: 8 additions & 8 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down Expand Up @@ -258,7 +258,7 @@ public function command($command, array $options = [])
}

$operation = new DatabaseCommand($this->databaseName, $command, $options);
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -280,7 +280,7 @@ public function createCollection($collectionName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
Expand All @@ -307,7 +307,7 @@ public function drop(array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -335,7 +335,7 @@ public function dropCollection($collectionName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -420,7 +420,7 @@ public function getWriteConcern()
public function listCollections(array $options = [])
{
$operation = new ListCollections($this->databaseName, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

return $operation->execute($server);
}
Expand All @@ -442,7 +442,7 @@ public function modifyCollection($collectionName, array $collectionOptions, arra
$options['typeMap'] = $this->typeMap;
}

$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));

if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -509,7 +509,7 @@ public function watch(array $pipeline = [], array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));

if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
Expand Down
Loading