Skip to content

Commit 6f74844

Browse files
committed
Use PipelineBuilder
1 parent 637522d commit 6f74844

File tree

2 files changed

+73
-76
lines changed

2 files changed

+73
-76
lines changed

src/Query/Builder.php

Lines changed: 62 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,9 @@
2121
use MongoDB\BSON\ObjectID;
2222
use MongoDB\BSON\Regex;
2323
use MongoDB\BSON\UTCDateTime;
24-
use MongoDB\Builder\Accumulator\FirstAccumulator;
24+
use MongoDB\Builder\Accumulator;
2525
use MongoDB\Builder\BuilderEncoder;
2626
use MongoDB\Builder\Expression\FieldPath;
27-
use MongoDB\Builder\Pipeline;
28-
use MongoDB\Builder\Stage;
29-
use MongoDB\Builder\Stage\CountStage;
30-
use MongoDB\Builder\Stage\GroupStage;
31-
use MongoDB\Builder\Stage\LimitStage;
32-
use MongoDB\Builder\Stage\MatchStage;
33-
use MongoDB\Builder\Stage\ProjectStage;
34-
use MongoDB\Builder\Stage\ReplaceRootStage;
35-
use MongoDB\Builder\Stage\SkipStage;
36-
use MongoDB\Builder\Stage\SortStage;
37-
use MongoDB\Builder\Stage\UnwindStage;
38-
use MongoDB\Builder\Type\StageInterface;
3927
use MongoDB\Builder\Variable;
4028
use MongoDB\Driver\Cursor;
4129
use Override;
@@ -295,21 +283,21 @@ public function dump(mixed ...$args)
295283
return $this;
296284
}
297285

298-
/** @return StageInterface[] */
299-
protected function getPipeline(): array
286+
protected function getPipelineBuilder(): PipelineBuilder
300287
{
301288
$columns = $this->columns ?? [];
302289

290+
$pipelineBuilder = new PipelineBuilder([], $this->collection, $this->options);
291+
303292
// Drop all columns if * is present, MongoDB does not work this way.
304293
if (in_array('*', $columns)) {
305294
$columns = [];
306295
}
307296

308-
$pipeline = [];
309297
$wheres = $this->compileWheres();
310298

311299
if (count($wheres)) {
312-
$pipeline[] = new MatchStage($wheres);
300+
$pipelineBuilder->match(...$wheres);
313301
}
314302

315303
// Use MongoDB's aggregation framework when using grouping or aggregation functions.
@@ -371,81 +359,75 @@ protected function getPipeline(): array
371359
$group['_id'] = null;
372360
}
373361

374-
// Build the aggregation pipeline.
375-
$pipeline = [];
376-
if ($wheres) {
377-
$pipeline[] = Stage::match(...$wheres);
378-
}
379-
380362
// apply unwinds for subdocument array aggregation
381363
foreach ($unwinds as $unwind) {
382-
$pipeline[] = new UnwindStage($unwind);
364+
$pipelineBuilder->unwind($unwind);
383365
}
384366

385367
if ($group) {
386-
$pipeline[] = new GroupStage(...$group);
368+
$pipelineBuilder->group(...$group);
387369
}
388370

389371
// Apply order and limit
390372
if ($this->orders) {
391-
$pipeline[] = new SortStage($this->orders);
373+
$pipelineBuilder->sort($this->orders);
392374
}
393375

394376
if ($this->offset) {
395-
$pipeline[] = new SkipStage($this->offset);
377+
$pipelineBuilder->skip($this->offset);
396378
}
397379

398380
if ($this->limit) {
399-
$pipeline[] = new LimitStage($this->limit);
381+
$pipelineBuilder->limit($this->limit);
400382
}
401383

402384
if ($this->projections) {
403-
$pipeline[] = new ProjectStage(...$this->projections);
385+
$pipelineBuilder->project(...$this->projections);
404386
}
405387

406-
return $pipeline;
388+
return $pipelineBuilder;
407389
}
408390

409391
// Distinct query
410392
if ($this->distinct) {
411393
// Return distinct results directly
412394
$column = $columns[0] ?? '_id';
413395

414-
$pipeline[] = new GroupStage(
415-
_id: new FieldPath($column),
416-
_document: new FirstAccumulator(Variable::root()),
396+
$pipelineBuilder->group(
397+
_id: \MongoDB\Builder\Expression::fieldPath($column),
398+
_document: Accumulator::first(Variable::root()),
417399
);
418-
$pipeline[] = new ReplaceRootStage(
400+
$pipelineBuilder->replaceRoot(
419401
newRoot: new FieldPath('_document'),
420402
);
421403
}
422404

423-
// Normal query
424-
// Convert select columns to simple projections.
425-
$projection = array_fill_keys($columns, true);
426-
427-
// Add custom projections.
428-
if ($this->projections) {
429-
$projection = array_merge($projection, $this->projections);
430-
}
431-
432405
if ($this->orders) {
433-
$pipeline[] = new SortStage(...$this->orders);
406+
$pipelineBuilder->sort(...$this->orders);
434407
}
435408

436409
if ($this->offset) {
437-
$pipeline[] = new SkipStage($this->offset);
410+
$pipelineBuilder->skip($this->offset);
438411
}
439412

440413
if ($this->limit) {
441-
$pipeline[] = new LimitStage($this->limit);
414+
$pipelineBuilder->limit($this->limit);
415+
}
416+
417+
// Normal query
418+
// Convert select columns to simple projections.
419+
$projection = array_fill_keys($columns, true);
420+
421+
// Add custom projections.
422+
if ($this->projections) {
423+
$projection = array_merge($projection, $this->projections);
442424
}
443425

444426
if ($projection) {
445-
$pipeline[] = new ProjectStage(...$projection);
427+
$pipelineBuilder->project(...$projection);
446428
}
447429

448-
return $pipeline;
430+
return $pipelineBuilder;
449431
}
450432

451433
/**
@@ -457,9 +439,8 @@ protected function getPipeline(): array
457439
*/
458440
public function toMql(): array
459441
{
460-
$pipeline = $this->getPipeline();
461442
$encoder = new BuilderEncoder();
462-
$pipeline = $encoder->encode(new Pipeline(...$pipeline));
443+
$pipeline = $encoder->encode($this->getPipelineBuilder()->getPipeline());
463444

464445
$options = ['typeMap' => ['root' => 'array', 'document' => 'array']];
465446

@@ -557,32 +538,37 @@ public function generateCacheKey()
557538
/** @return ($function === null ? PipelineBuilder : self) */
558539
public function aggregate($function = null, $columns = [])
559540
{
541+
$builder = $this->getPipelineBuilder();
542+
560543
if ($function === null) {
561-
return new PipelineBuilder($this->getPipeline(), $this->collection, $this->options);
544+
return $builder;
562545
}
563546

564-
$this->aggregate = [
565-
'function' => $function,
566-
'columns' => $columns,
567-
];
568-
569-
$previousColumns = $this->columns;
570-
571-
// We will also back up the select bindings since the select clause will be
572-
// removed when performing the aggregate function. Once the query is run
573-
// we will add the bindings back onto this query so they can get used.
574-
$previousSelectBindings = $this->bindings['select'];
575-
576-
$this->bindings['select'] = [];
577-
578-
$results = $this->get($columns);
547+
match ($function) {
548+
'count' => $builder->group(
549+
_id: null,
550+
aggregate: Accumulator::sum(1),
551+
),
552+
'sum' => $builder->group(
553+
_id: null,
554+
aggregate: Accumulator::sum(\MongoDB\Builder\Expression::fieldPath($columns[0])),
555+
),
556+
'avg' => $builder->group(
557+
_id: null,
558+
aggregate: Accumulator::avg(\MongoDB\Builder\Expression::fieldPath($columns[0])),
559+
),
560+
'min' => $builder->group(
561+
_id: null,
562+
aggregate: Accumulator::min(\MongoDB\Builder\Expression::fieldPath($columns[0])),
563+
),
564+
'max' => $builder->group(
565+
_id: null,
566+
aggregate: Accumulator::max(\MongoDB\Builder\Expression::fieldPath($columns[0])),
567+
),
568+
default => throw new InvalidArgumentException('Unknown aggregate function: ' . $function),
569+
};
579570

580-
// Once we have executed the query, we will reset the aggregate property so
581-
// that more select queries can be executed against the database without
582-
// the aggregate value getting in the way when the grammar builds it.
583-
$this->aggregate = null;
584-
$this->columns = $previousColumns;
585-
$this->bindings['select'] = $previousSelectBindings;
571+
$results = $builder->get();
586572

587573
if (isset($results[0])) {
588574
$result = (array) $results[0];
@@ -977,14 +963,14 @@ public function runPaginationCountQuery($columns = ['*'])
977963
if ($this->groups || $this->havings) {
978964
$without = $this->unions ? ['orders', 'limit', 'offset'] : ['columns', 'orders', 'limit', 'offset'];
979965

980-
$mql = $this->cloneWithout($without)
966+
$pipelienBuilder = $this->cloneWithout($without)
981967
->cloneWithoutBindings($this->unions ? ['order'] : ['select', 'order'])
982-
->toMql();
968+
->getPipelineBuilder();
983969

984970
// Adds the $count stage to the pipeline
985-
$mql['aggregate'][0][] = new CountStage('aggregate');
971+
$pipelienBuilder->count('aggregate');
986972

987-
return $this->collection->aggregate($mql['aggregate'][0], $mql['aggregate'][1])->toArray();
973+
return $pipelienBuilder->get();
988974
}
989975

990976
return parent::runPaginationCountQuery($columns);

src/Query/PipelineBuilder.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,17 @@ public function __construct(
2222
$this->pipeline = $pipeline;
2323
}
2424

25+
/**
26+
* Add a stage without using the builder. Necessary if the stage is built
27+
* outside the builder, or it is not yet supported by the library.
28+
*/
29+
public function addRawStage(string $operator, mixed $value): static
30+
{
31+
$this->pipeline[] = [$operator => $value];
32+
33+
return $this;
34+
}
35+
2536
/**
2637
* Execute the aggregation pipeline and return the results.
2738
*/

0 commit comments

Comments
 (0)