@@ -71,37 +71,75 @@ public function __construct(Manager $manager, $namespace, WriteConcern $writeCon
71
71
72
72
/**
73
73
* Runs an aggregation framework pipeline
74
- * NOTE: The return value of this method depends on your MongoDB server version
75
- * and possibly options.
76
- * MongoDB 2.6 (and later) will return a Cursor by default
77
- * MongoDB pre 2.6 will return an ArrayIterator
74
+ *
75
+ * Note: this method's return value depends on the MongoDB server version
76
+ * and the "useCursor" option. If "useCursor" is true, a Cursor will be
77
+ * returned; otherwise, an ArrayIterator is returned, which wraps the
78
+ * "result" array from the command response document.
78
79
*
79
80
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
80
- * @see Collection::getAggregateOptions() for supported $options
81
81
*
82
82
* @param array $pipeline The pipeline to execute
83
83
* @param array $options Additional options
84
84
* @return Iterator
85
85
*/
86
86
public function aggregate (array $ pipeline , array $ options = array ())
87
87
{
88
- $ options = array_merge ($ this ->getAggregateOptions (), $ options );
89
- $ options = $ this ->_massageAggregateOptions ($ options );
90
- $ cmd = array (
91
- "aggregate " => $ this ->collname ,
92
- "pipeline " => $ pipeline ,
93
- ) + $ options ;
88
+ $ readPreference = new ReadPreference (ReadPreference::RP_PRIMARY );
89
+ $ server = $ this ->manager ->selectServer ($ readPreference );
90
+
91
+ if (FeatureDetection::isSupported ($ server , FeatureDetection::API_AGGREGATE_CURSOR )) {
92
+ $ options = array_merge (
93
+ array (
94
+ /**
95
+ * Enables writing to temporary files. When set to true, aggregation stages
96
+ * can write data to the _tmp subdirectory in the dbPath directory. The
97
+ * default is false.
98
+ *
99
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
100
+ */
101
+ 'allowDiskUse ' => false ,
102
+ /**
103
+ * The number of documents to return per batch.
104
+ *
105
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
106
+ */
107
+ 'batchSize ' => 0 ,
108
+ /**
109
+ * The maximum amount of time to allow the query to run.
110
+ *
111
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
112
+ */
113
+ 'maxTimeMS ' => 0 ,
114
+ /**
115
+ * Indicates if the results should be provided as a cursor.
116
+ *
117
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
118
+ */
119
+ 'useCursor ' => true ,
120
+ ),
121
+ $ options
122
+ );
123
+ }
94
124
95
- $ cursor = $ this ->_runCommand ($ this ->dbname , $ cmd );
125
+ $ options = $ this ->_massageAggregateOptions ($ options );
126
+ $ command = new Command (array (
127
+ 'aggregate ' => $ this ->collname ,
128
+ 'pipeline ' => $ pipeline ,
129
+ ) + $ options );
130
+ $ cursor = $ server ->executeCommand ($ this ->dbname , $ command );
96
131
97
- if (isset ( $ cmd [ " cursor " ]) && $ cmd ["cursor " ]) {
132
+ if ( ! empty ( $ options ["cursor " ]) ) {
98
133
return $ cursor ;
99
134
}
100
135
101
136
$ doc = current ($ cursor ->toArray ());
102
137
103
138
if ($ doc ["ok " ]) {
104
- return new \ArrayIterator ($ doc ["result " ]);
139
+ return new \ArrayIterator (array_map (
140
+ function (\stdClass $ document ) { return (array ) $ document ; },
141
+ $ doc ["result " ]
142
+ ));
105
143
}
106
144
107
145
throw $ this ->_generateCommandException ($ doc );
@@ -578,55 +616,6 @@ public function findOneAndUpdate(array $filter, array $update, array $options =
578
616
throw $ this ->_generateCommandException ($ doc );
579
617
}
580
618
581
- /**
582
- * Retrieves all aggregate options with their default values.
583
- *
584
- * @return array of Collection::aggregate() options
585
- */
586
- public function getAggregateOptions ()
587
- {
588
- $ opts = array (
589
- /**
590
- * Enables writing to temporary files. When set to true, aggregation stages
591
- * can write data to the _tmp subdirectory in the dbPath directory. The
592
- * default is false.
593
- *
594
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
595
- */
596
- "allowDiskUse " => false ,
597
-
598
- /**
599
- * The number of documents to return per batch.
600
- *
601
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
602
- */
603
- "batchSize " => 0 ,
604
-
605
- /**
606
- * The maximum amount of time to allow the query to run.
607
- *
608
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
609
- */
610
- "maxTimeMS " => 0 ,
611
-
612
- /**
613
- * Indicates if the results should be provided as a cursor.
614
- *
615
- * The default for this value depends on the version of the server.
616
- * - Servers >= 2.6 will use a default of true.
617
- * - Servers < 2.6 will use a default of false.
618
- *
619
- * As with any other property, this value can be changed.
620
- *
621
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
622
- */
623
- "useCursor " => true ,
624
- );
625
-
626
- /* FIXME: Add a version check for useCursor */
627
- return $ opts ;
628
- }
629
-
630
619
/**
631
620
* Retrieves all Bulk Write options with their default values.
632
621
*
@@ -1151,8 +1140,10 @@ final protected function _generateCommandException($doc)
1151
1140
*/
1152
1141
protected function _massageAggregateOptions ($ options )
1153
1142
{
1154
- if ($ options ["useCursor " ]) {
1155
- $ options ["cursor " ] = array ("batchSize " => $ options ["batchSize " ]);
1143
+ if ( ! empty ($ options ["useCursor " ])) {
1144
+ $ options ["cursor " ] = isset ($ options ["batchSize " ])
1145
+ ? array ("batchSize " => (integer ) $ options ["batchSize " ])
1146
+ : new stdClass ;
1156
1147
}
1157
1148
unset($ options ["useCursor " ], $ options ["batchSize " ]);
1158
1149
0 commit comments