Skip to content

Commit 03eb7c8

Browse files
committed
Extract Collection::aggregate() to an operation class
1 parent cf63a84 commit 03eb7c8

File tree

2 files changed

+176
-77
lines changed

2 files changed

+176
-77
lines changed

src/Collection.php

Lines changed: 9 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use MongoDB\Model\IndexInfoIterator;
1616
use MongoDB\Model\IndexInfoIteratorIterator;
1717
use MongoDB\Model\IndexInput;
18+
use MongoDB\Operation\Aggregate;
19+
use Traversable;
1820

1921
class Collection
2022
{
@@ -78,79 +80,25 @@ public function __toString()
7880
}
7981

8082
/**
81-
* Runs an aggregation framework pipeline
83+
* Executes an aggregation framework pipeline on the collection.
8284
*
8385
* Note: this method's return value depends on the MongoDB server version
8486
* and the "useCursor" option. If "useCursor" is true, a Cursor will be
8587
* returned; otherwise, an ArrayIterator is returned, which wraps the
8688
* "result" array from the command response document.
8789
*
88-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
89-
*
90-
* @param array $pipeline The pipeline to execute
91-
* @param array $options Additional options
92-
* @return Iterator
90+
* @see Aggregate::__construct() for supported options
91+
* @param array $pipeline List of pipeline operations
92+
* @param array $options Command options
93+
* @return Traversable
9394
*/
9495
public function aggregate(array $pipeline, array $options = array())
9596
{
9697
$readPreference = new ReadPreference(ReadPreference::RP_PRIMARY);
9798
$server = $this->manager->selectServer($readPreference);
99+
$operation = new Aggregate($this->dbname, $this->collname, $pipeline, $options);
98100

99-
if (FeatureDetection::isSupported($server, FeatureDetection::API_AGGREGATE_CURSOR)) {
100-
$options = array_merge(
101-
array(
102-
/**
103-
* Enables writing to temporary files. When set to true, aggregation stages
104-
* can write data to the _tmp subdirectory in the dbPath directory. The
105-
* default is false.
106-
*
107-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
108-
*/
109-
'allowDiskUse' => false,
110-
/**
111-
* The number of documents to return per batch.
112-
*
113-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
114-
*/
115-
'batchSize' => 0,
116-
/**
117-
* The maximum amount of time to allow the query to run.
118-
*
119-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
120-
*/
121-
'maxTimeMS' => 0,
122-
/**
123-
* Indicates if the results should be provided as a cursor.
124-
*
125-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
126-
*/
127-
'useCursor' => true,
128-
),
129-
$options
130-
);
131-
}
132-
133-
$options = $this->_massageAggregateOptions($options);
134-
$command = new Command(array(
135-
'aggregate' => $this->collname,
136-
'pipeline' => $pipeline,
137-
) + $options);
138-
$cursor = $server->executeCommand($this->dbname, $command);
139-
140-
if ( ! empty($options["cursor"])) {
141-
return $cursor;
142-
}
143-
144-
$doc = current($cursor->toArray());
145-
146-
if ($doc["ok"]) {
147-
return new \ArrayIterator(array_map(
148-
function (\stdClass $document) { return (array) $document; },
149-
$doc["result"]
150-
));
151-
}
152-
153-
throw $this->_generateCommandException($doc);
101+
return $operation->execute($server);
154102
}
155103

156104
/**
@@ -1168,22 +1116,6 @@ final protected function _generateCommandException($doc)
11681116
return new RuntimeException("FIXME: Unknown error");
11691117
}
11701118

1171-
/**
1172-
* Internal helper for massaging aggregate options
1173-
* @internal
1174-
*/
1175-
protected function _massageAggregateOptions($options)
1176-
{
1177-
if ( ! empty($options["useCursor"])) {
1178-
$options["cursor"] = isset($options["batchSize"])
1179-
? array("batchSize" => (integer) $options["batchSize"])
1180-
: new stdClass;
1181-
}
1182-
unset($options["useCursor"], $options["batchSize"]);
1183-
1184-
return $options;
1185-
}
1186-
11871119
/**
11881120
* Internal helper for massaging findandmodify options
11891121
* @internal

src/Operation/Aggregate.php

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
<?php
2+
3+
namespace MongoDB\Operation;
4+
5+
use MongoDB\FeatureDetection;
6+
use MongoDB\Driver\Command;
7+
use MongoDB\Driver\Server;
8+
use MongoDB\Exception\InvalidArgumentException;
9+
use MongoDB\Exception\InvalidArgumentTypeException;
10+
use MongoDB\Exception\RuntimeException;
11+
use ArrayIterator;
12+
use stdClass;
13+
use Traversable;
14+
15+
/**
16+
* Operation for the aggregate command.
17+
*
18+
* @api
19+
* @see MongoDB\Collection::aggregate()
20+
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
21+
*/
22+
class Aggregate implements Executable
23+
{
24+
private static $wireVersionForCursor = 2;
25+
26+
private $databaseName;
27+
private $collectionName;
28+
private $pipeline;
29+
private $options;
30+
31+
/**
32+
* Constructs an aggregate command.
33+
*
34+
* Supported options:
35+
*
36+
* * allowDiskUse (boolean): Enables writing to temporary files. When set
37+
* to true, aggregation stages can write data to the _tmp sub-directory
38+
* in the dbPath directory. The default is false.
39+
*
40+
* * batchSize (integer): The number of documents to return per batch.
41+
*
42+
* * maxTimeMS (integer): The maximum amount of time to allow the query to
43+
* run.
44+
*
45+
* * useCursor (boolean): Indicates whether the command will request that
46+
* the server provide results using a cursor. The default is true.
47+
*
48+
* For servers < 2.6, this option is ignored as aggregation cursors are
49+
* not available.
50+
*
51+
* For servers >= 2.6, this option allows users to turn off cursors if
52+
* necessary to aid in mongod/mongos upgrades.
53+
*
54+
* @param string $databaseName Database name
55+
* @param string $collectionName Collection name
56+
* @param array $pipeline List of pipeline operations
57+
* @param array $options Command options
58+
* @throws InvalidArgumentException
59+
*/
60+
public function __construct($databaseName, $collectionName, array $pipeline, array $options = array())
61+
{
62+
$options += array(
63+
'allowDiskUse' => false,
64+
'useCursor' => true,
65+
);
66+
67+
if ( ! is_bool($options['allowDiskUse'])) {
68+
throw new InvalidArgumentTypeException('"allowDiskUse" option', $options['allowDiskUse'], 'boolean');
69+
}
70+
71+
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
72+
throw new InvalidArgumentTypeException('"batchSize" option', $options['batchSize'], 'integer');
73+
}
74+
75+
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
76+
throw new InvalidArgumentTypeException('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
77+
}
78+
79+
if ( ! is_bool($options['useCursor'])) {
80+
throw new InvalidArgumentTypeException('"useCursor" option', $options['useCursor'], 'boolean');
81+
}
82+
83+
if (isset($options['batchSize']) && ! $options['useCursor']) {
84+
throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false');
85+
}
86+
87+
$expectedIndex = 0;
88+
89+
foreach ($pipeline as $i => $op) {
90+
if ($i !== $expectedIndex) {
91+
throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
92+
}
93+
94+
if ( ! is_array($op) && ! is_object($op)) {
95+
throw new InvalidArgumentTypeException(sprintf('$pipeline[%d]', $i), $op, 'array or object');
96+
}
97+
98+
$expectedIndex += 1;
99+
}
100+
101+
$this->databaseName = (string) $databaseName;
102+
$this->collectionName = (string) $collectionName;
103+
$this->pipeline = $pipeline;
104+
$this->options = $options;
105+
}
106+
107+
/**
108+
* Execute the operation.
109+
*
110+
* @see Executable::execute()
111+
* @param Server $server
112+
* @return Traversable
113+
*/
114+
public function execute(Server $server)
115+
{
116+
$command = $this->createCommand($server);
117+
$cursor = $server->executeCommand($this->databaseName, $command);
118+
119+
if ($this->options['useCursor']) {
120+
return $cursor;
121+
}
122+
123+
$result = current($cursor->toArray());
124+
125+
if (empty($result['ok'])) {
126+
throw new RuntimeException(isset($result['errmsg']) ? $result['errmsg'] : 'Unknown error');
127+
}
128+
129+
return new ArrayIterator(array_map(
130+
function (stdClass $document) { return (array) $document; },
131+
$result['result']
132+
));
133+
}
134+
135+
/**
136+
* Create the aggregate command.
137+
*
138+
* @param Server $server
139+
* @return Command
140+
*/
141+
private function createCommand(Server $server)
142+
{
143+
$cmd = array(
144+
'aggregate' => $this->collectionName,
145+
'pipeline' => $this->pipeline,
146+
);
147+
148+
// Servers < 2.6 do not support any command options
149+
if ( ! FeatureDetection::isSupported($server, self::$wireVersionForCursor)) {
150+
return new Command($cmd);
151+
}
152+
153+
$cmd['allowDiskUse'] = $this->options['allowDiskUse'];
154+
155+
if (isset($this->options['maxTimeMS'])) {
156+
$cmd['maxTimeMS'] = $this->options['maxTimeMS'];
157+
}
158+
159+
if ($this->options['useCursor']) {
160+
$cmd['cursor'] = isset($this->options["batchSize"])
161+
? array('batchSize' => $this->options["batchSize"])
162+
: new stdClass;
163+
}
164+
165+
return new Command($cmd);
166+
}
167+
}

0 commit comments

Comments
 (0)