Skip to content

PHPLIB-651: Support $merge and $out executing on secondaries #861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class Collection
/** @var integer */
private static $wireVersionForReadConcernWithWriteStage = 8;

/** @var integer */
private static $wireVersionForSecondarySupportsWriteStage = 13;

/** @var string */
private $collectionName;

Expand Down Expand Up @@ -225,11 +228,21 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = $this->readPreference;
}

if ($hasWriteStage) {
$server = select_server($this->manager, $options);

/* If a write stage is being used with a read preference (explicit or
* inherited), check that the wire version supports it. If not, force a
* primary read preference and select a new server if necessary. */
if (
$hasWriteStage && isset($options['readPreference']) &&
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = select_server($this->manager, $options);
if ($server->isSecondary()) {
$server = select_server($this->manager, $options);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I recently spoke to @kevinAlbs about this (re: mongodb/specifications#1062 (comment)), I mentioned an alternative implementation that could bypass a second server selection attempt (i.e. mongoc_client_select_server) by instead iterating on Manager::getServers() to select a primary.

Thanks to the first server selection attempt, we can be guaranteed that the topology has been discovered. getServers() shouldn't return an empty array unless there really are no servers available. But I also don't think there's much benefit to that approach vs. simply invoking server selection a second time. The edge case is that if there really isn't a primary available, we'll block for serverSelectionTimeoutMS on this second attempt.

But if the root concern is potentially blocking the application for 2 * serverSelectionTimeoutMS, let's consider how that might actually happen:

  • If the first attempt is looking for a secondary to execute an $out/$merge pipeline and the topology doesn't have a secondary at all, the default scenario will abort early thanks to serverSelectionTryOnce and abort the entire aggregate() method.
  • If serverSelectionTryOnce=false and a loop is permitted, server selection could theoretically discover a secondary just before serverSelectionTimeoutMS expires (e.g. at 29 seconds in) and return that server.
  • If we then discover the selected server is a pre-5.0 secondary, we'll re-enter server selection to look for a primary. Worst case, this could also take serverSelectionTimeoutMS to ultimately select a primary (e.g. wait through an election) or hit a server selection timeout.

That scenario seems quite rare, so I'm inclined to keep the code as-is rather and punt on any optimization until we actually have evidence that this causes problems down the line. IMO, the aggregate() helper is already very convoluted so anything we can do to avoid introducing more complexity is welcomed.

}
}

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down
19 changes: 16 additions & 3 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class Database
/** @var integer */
private static $wireVersionForReadConcernWithWriteStage = 8;

/** @var integer */
private static $wireVersionForSecondarySupportsWriteStage = 13;

/** @var string */
private $databaseName;

Expand Down Expand Up @@ -206,11 +209,21 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = $this->readPreference;
}

if ($hasWriteStage) {
$server = select_server($this->manager, $options);

/* If a write stage is being used with a read preference (explicit or
* inherited), check that the wire version supports it. If not, force a
* primary read preference and select a new server if necessary. */
if (
$hasWriteStage && isset($options['readPreference']) &&
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = select_server($this->manager, $options);
if ($server->isSecondary()) {
$server = select_server($this->manager, $options);
}
}

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down
78 changes: 43 additions & 35 deletions src/Operation/Aggregate.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use ArrayIterator;
use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
use MongoDB\Driver\ReadConcern;
use MongoDB\Driver\ReadPreference;
Expand Down Expand Up @@ -75,6 +76,12 @@ class Aggregate implements Executable, Explainable
/** @var array */
private $options;

/** @var bool */
private $isExplain;

/** @var bool */
private $isWrite;

/**
* Constructs an aggregate command.
*
Expand Down Expand Up @@ -253,8 +260,19 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
unset($options['writeConcern']);
}

if (! empty($options['explain'])) {
$this->isExplain = ! empty($options['explain']);
$this->isWrite = is_last_pipeline_operator_write($pipeline) && ! $this->isExplain;

// Explain does not use a cursor
if ($this->isExplain) {
$options['useCursor'] = false;
unset($options['batchSize']);
}

/* Ignore batchSize for writes, since no documents are returned and a
* batchSize of zero could prevent the pipeline from executing. */
if ($this->isWrite) {
unset($options['batchSize']);
}

$this->databaseName = (string) $databaseName;
Expand Down Expand Up @@ -298,20 +316,14 @@ public function execute(Server $server)
}
}

$hasExplain = ! empty($this->options['explain']);
$hasWriteStage = $this->hasWriteStage();

$command = new Command(
$this->createCommandDocument($server, $hasWriteStage),
$this->createCommandDocument($server),
$this->createCommandOptions()
);
$options = $this->createOptions($hasWriteStage, $hasExplain);

$cursor = $hasWriteStage && ! $hasExplain
? $server->executeReadWriteCommand($this->databaseName, $command, $options)
: $server->executeReadCommand($this->databaseName, $command, $options);
$cursor = $this->executeCommand($server, $command);

if ($this->options['useCursor'] || $hasExplain) {
if ($this->options['useCursor'] || $this->isExplain) {
if (isset($this->options['typeMap'])) {
$cursor->setTypeMap($this->options['typeMap']);
}
Expand Down Expand Up @@ -341,10 +353,10 @@ public function execute(Server $server)
*/
public function getCommandDocument(Server $server)
{
return $this->createCommandDocument($server, $this->hasWriteStage());
return $this->createCommandDocument($server);
}

private function createCommandDocument(Server $server, bool $hasWriteStage): array
private function createCommandDocument(Server $server): array
{
$cmd = [
'aggregate' => $this->collectionName ?? 1,
Expand Down Expand Up @@ -377,10 +389,7 @@ private function createCommandDocument(Server $server, bool $hasWriteStage): arr
}

if ($this->options['useCursor']) {
/* Ignore batchSize if pipeline includes an $out or $merge stage, as
* no documents will be returned and sending a batchSize of zero
* could prevent the pipeline from executing at all. */
$cmd['cursor'] = isset($this->options["batchSize"]) && ! $hasWriteStage
$cmd['cursor'] = isset($this->options["batchSize"])
? ['batchSize' => $this->options["batchSize"]]
: new stdClass();
}
Expand All @@ -400,39 +409,38 @@ private function createCommandOptions(): array
}

/**
* Create options for executing the command.
* Execute the aggregate command using the appropriate Server method.
*
* @see http://php.net/manual/en/mongodb-driver-server.executecommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
* @param boolean $hasWriteStage
* @param boolean $hasExplain
* @return array
*/
private function createOptions($hasWriteStage, $hasExplain)
private function executeCommand(Server $server, Command $command): Cursor
{
$options = [];

if (isset($this->options['readConcern'])) {
$options['readConcern'] = $this->options['readConcern'];
foreach (['readConcern', 'readPreference', 'session'] as $option) {
if (isset($this->options[$option])) {
$options[$option] = $this->options[$option];
}
}

if (! $hasWriteStage && isset($this->options['readPreference'])) {
$options['readPreference'] = $this->options['readPreference'];
if ($this->isWrite && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern'];
}

if (isset($this->options['session'])) {
$options['session'] = $this->options['session'];
if (! $this->isWrite) {
return $server->executeReadCommand($this->databaseName, $command, $options);
}

if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern'];
/* Server::executeReadWriteCommand() does not support a "readPreference"
* option, so fall back to executeCommand(). This means that libmongoc
* will not apply any client-level options (e.g. writeConcern), but that
* should not be an issue as PHPLIB handles inheritance on its own. */
if (isset($options['readPreference'])) {
return $server->executeCommand($this->databaseName, $command, $options);
}

return $options;
}

private function hasWriteStage(): bool
{
return is_last_pipeline_operator_write($this->pipeline);
return $server->executeReadWriteCommand($this->databaseName, $command, $options);
}
}
Loading