Skip to content

Commit 7ed839a

Browse files
authored
PHPLIB-651: Support $merge and $out executing on secondaries (#861)
Synced CRUD spec tests with mongodb/specifications@5f8f668
1 parent f4a6e8d commit 7ed839a

File tree

5 files changed

+975
-41
lines changed

5 files changed

+975
-41
lines changed

src/Collection.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class Collection
8787
/** @var integer */
8888
private static $wireVersionForReadConcernWithWriteStage = 8;
8989

90+
/** @var integer */
91+
private static $wireVersionForSecondarySupportsWriteStage = 13;
92+
9093
/** @var string */
9194
private $collectionName;
9295

@@ -225,11 +228,21 @@ public function aggregate(array $pipeline, array $options = [])
225228
$options['readPreference'] = $this->readPreference;
226229
}
227230

228-
if ($hasWriteStage) {
231+
$server = select_server($this->manager, $options);
232+
233+
/* If a write stage is being used with a read preference (explicit or
234+
* inherited), check that the wire version supports it. If not, force a
235+
* primary read preference and select a new server if necessary. */
236+
if (
237+
$hasWriteStage && isset($options['readPreference']) &&
238+
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
239+
) {
229240
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
230-
}
231241

232-
$server = select_server($this->manager, $options);
242+
if ($server->isSecondary()) {
243+
$server = select_server($this->manager, $options);
244+
}
245+
}
233246

234247
/* MongoDB 4.2 and later supports a read concern when an $out stage is
235248
* being used, but earlier versions do not.

src/Database.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ class Database
6464
/** @var integer */
6565
private static $wireVersionForReadConcernWithWriteStage = 8;
6666

67+
/** @var integer */
68+
private static $wireVersionForSecondarySupportsWriteStage = 13;
69+
6770
/** @var string */
6871
private $databaseName;
6972

@@ -206,11 +209,21 @@ public function aggregate(array $pipeline, array $options = [])
206209
$options['readPreference'] = $this->readPreference;
207210
}
208211

209-
if ($hasWriteStage) {
212+
$server = select_server($this->manager, $options);
213+
214+
/* If a write stage is being used with a read preference (explicit or
215+
* inherited), check that the wire version supports it. If not, force a
216+
* primary read preference and select a new server if necessary. */
217+
if (
218+
$hasWriteStage && isset($options['readPreference']) &&
219+
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
220+
) {
210221
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
211-
}
212222

213-
$server = select_server($this->manager, $options);
223+
if ($server->isSecondary()) {
224+
$server = select_server($this->manager, $options);
225+
}
226+
}
214227

215228
/* MongoDB 4.2 and later supports a read concern when an $out stage is
216229
* being used, but earlier versions do not.

src/Operation/Aggregate.php

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
use ArrayIterator;
2121
use MongoDB\Driver\Command;
22+
use MongoDB\Driver\Cursor;
2223
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
2324
use MongoDB\Driver\ReadConcern;
2425
use MongoDB\Driver\ReadPreference;
@@ -75,6 +76,12 @@ class Aggregate implements Executable, Explainable
7576
/** @var array */
7677
private $options;
7778

79+
/** @var bool */
80+
private $isExplain;
81+
82+
/** @var bool */
83+
private $isWrite;
84+
7885
/**
7986
* Constructs an aggregate command.
8087
*
@@ -253,8 +260,19 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
253260
unset($options['writeConcern']);
254261
}
255262

256-
if (! empty($options['explain'])) {
263+
$this->isExplain = ! empty($options['explain']);
264+
$this->isWrite = is_last_pipeline_operator_write($pipeline) && ! $this->isExplain;
265+
266+
// Explain does not use a cursor
267+
if ($this->isExplain) {
257268
$options['useCursor'] = false;
269+
unset($options['batchSize']);
270+
}
271+
272+
/* Ignore batchSize for writes, since no documents are returned and a
273+
* batchSize of zero could prevent the pipeline from executing. */
274+
if ($this->isWrite) {
275+
unset($options['batchSize']);
258276
}
259277

260278
$this->databaseName = (string) $databaseName;
@@ -298,20 +316,14 @@ public function execute(Server $server)
298316
}
299317
}
300318

301-
$hasExplain = ! empty($this->options['explain']);
302-
$hasWriteStage = $this->hasWriteStage();
303-
304319
$command = new Command(
305-
$this->createCommandDocument($server, $hasWriteStage),
320+
$this->createCommandDocument($server),
306321
$this->createCommandOptions()
307322
);
308-
$options = $this->createOptions($hasWriteStage, $hasExplain);
309323

310-
$cursor = $hasWriteStage && ! $hasExplain
311-
? $server->executeReadWriteCommand($this->databaseName, $command, $options)
312-
: $server->executeReadCommand($this->databaseName, $command, $options);
324+
$cursor = $this->executeCommand($server, $command);
313325

314-
if ($this->options['useCursor'] || $hasExplain) {
326+
if ($this->options['useCursor'] || $this->isExplain) {
315327
if (isset($this->options['typeMap'])) {
316328
$cursor->setTypeMap($this->options['typeMap']);
317329
}
@@ -341,10 +353,10 @@ public function execute(Server $server)
341353
*/
342354
public function getCommandDocument(Server $server)
343355
{
344-
return $this->createCommandDocument($server, $this->hasWriteStage());
356+
return $this->createCommandDocument($server);
345357
}
346358

347-
private function createCommandDocument(Server $server, bool $hasWriteStage): array
359+
private function createCommandDocument(Server $server): array
348360
{
349361
$cmd = [
350362
'aggregate' => $this->collectionName ?? 1,
@@ -377,10 +389,7 @@ private function createCommandDocument(Server $server, bool $hasWriteStage): arr
377389
}
378390

379391
if ($this->options['useCursor']) {
380-
/* Ignore batchSize if pipeline includes an $out or $merge stage, as
381-
* no documents will be returned and sending a batchSize of zero
382-
* could prevent the pipeline from executing at all. */
383-
$cmd['cursor'] = isset($this->options["batchSize"]) && ! $hasWriteStage
392+
$cmd['cursor'] = isset($this->options["batchSize"])
384393
? ['batchSize' => $this->options["batchSize"]]
385394
: new stdClass();
386395
}
@@ -400,39 +409,38 @@ private function createCommandOptions(): array
400409
}
401410

402411
/**
403-
* Create options for executing the command.
412+
* Execute the aggregate command using the appropriate Server method.
404413
*
414+
* @see http://php.net/manual/en/mongodb-driver-server.executecommand.php
405415
* @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
406416
* @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
407-
* @param boolean $hasWriteStage
408-
* @param boolean $hasExplain
409-
* @return array
410417
*/
411-
private function createOptions($hasWriteStage, $hasExplain)
418+
private function executeCommand(Server $server, Command $command): Cursor
412419
{
413420
$options = [];
414421

415-
if (isset($this->options['readConcern'])) {
416-
$options['readConcern'] = $this->options['readConcern'];
422+
foreach (['readConcern', 'readPreference', 'session'] as $option) {
423+
if (isset($this->options[$option])) {
424+
$options[$option] = $this->options[$option];
425+
}
417426
}
418427

419-
if (! $hasWriteStage && isset($this->options['readPreference'])) {
420-
$options['readPreference'] = $this->options['readPreference'];
428+
if ($this->isWrite && isset($this->options['writeConcern'])) {
429+
$options['writeConcern'] = $this->options['writeConcern'];
421430
}
422431

423-
if (isset($this->options['session'])) {
424-
$options['session'] = $this->options['session'];
432+
if (! $this->isWrite) {
433+
return $server->executeReadCommand($this->databaseName, $command, $options);
425434
}
426435

427-
if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
428-
$options['writeConcern'] = $this->options['writeConcern'];
436+
/* Server::executeReadWriteCommand() does not support a "readPreference"
437+
* option, so fall back to executeCommand(). This means that libmongoc
438+
* will not apply any client-level options (e.g. writeConcern), but that
439+
* should not be an issue as PHPLIB handles inheritance on its own. */
440+
if (isset($options['readPreference'])) {
441+
return $server->executeCommand($this->databaseName, $command, $options);
429442
}
430443

431-
return $options;
432-
}
433-
434-
private function hasWriteStage(): bool
435-
{
436-
return is_last_pipeline_operator_write($this->pipeline);
444+
return $server->executeReadWriteCommand($this->databaseName, $command, $options);
437445
}
438446
}

0 commit comments

Comments
 (0)