Skip to content

Commit 0b8a33f

Browse files
committed
Add $out and $merge validation
1 parent 08be6d4 commit 0b8a33f

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

src/cursor/aggregation_cursor.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Document } from '../bson';
2+
import { MongoAPIError } from '../error';
23
import type { ExplainVerbosityLike } from '../explain';
34
import type { MongoClient } from '../mongo_client';
45
import { AggregateOperation, type AggregateOptions } from '../operations/aggregate';
@@ -8,7 +9,7 @@ import type { Sort } from '../sort';
89
import type { MongoDBNamespace } from '../utils';
910
import { mergeOptions } from '../utils';
1011
import type { AbstractCursorOptions, InitialCursorResponse } from './abstract_cursor';
11-
import { AbstractCursor } from './abstract_cursor';
12+
import { AbstractCursor, CursorTimeoutMode } from './abstract_cursor';
1213

1314
/** @public */
1415
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
@@ -36,6 +37,15 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
3637

3738
this.pipeline = pipeline;
3839
this.aggregateOptions = options;
40+
41+
if (
42+
this.cursorOptions.timeoutMS != null &&
43+
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
44+
this.pipeline.filter(stage => {
45+
Object.hasOwn(stage, '$out') || Object.hasOwn(stage, '$merge');
46+
}).length > 0
47+
)
48+
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
3949
}
4050

4151
clone(): AggregationCursor<TSchema> {
@@ -93,6 +103,13 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
93103
addStage<T = Document>(stage: Document): AggregationCursor<T>;
94104
addStage<T = Document>(stage: Document): AggregationCursor<T> {
95105
this.throwIfInitialized();
106+
if (
107+
this.cursorOptions.timeoutMS != null &&
108+
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
109+
(Object.hasOwn(stage, '$out') || Object.hasOwn(stage, '$merge'))
110+
) {
111+
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
112+
}
96113
this.pipeline.push(stage);
97114
return this as unknown as AggregationCursor<T>;
98115
}

test/unit/cursor/aggregation_cursor.test.ts

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { expect } from 'chai';
22

3-
import { type AggregationCursor, MongoClient } from '../../mongodb';
3+
import {
4+
type AggregationCursor,
5+
CursorTimeoutMode,
6+
MongoAPIError,
7+
MongoClient
8+
} from '../../mongodb';
49

510
describe('class AggregationCursor', () => {
611
let client: MongoClient;
@@ -126,6 +131,38 @@ describe('class AggregationCursor', () => {
126131
});
127132

128133
context('when addStage, bespoke stage methods, or array is used to construct pipeline', () => {
134+
context('when CSOT is enabled', () => {
135+
let aggregationCursor: AggregationCursor;
136+
before(function () {
137+
aggregationCursor = client
138+
.db('test')
139+
.collection('test')
140+
.aggregate([], { timeoutMS: 100, timeoutMode: CursorTimeoutMode.ITERATION });
141+
});
142+
143+
context('when a $out stage is add with .addStage()', () => {
144+
it('throws a MongoAPIError', function () {
145+
expect(() => {
146+
aggregationCursor.addStage({ $out: 'test' });
147+
}).to.throw(MongoAPIError);
148+
});
149+
});
150+
context('when a $merge stage is add with .addStage()', () => {
151+
it('throws a MongoAPIError', function () {
152+
expect(() => {
153+
aggregationCursor.addStage({ $merge: {} });
154+
}).to.throw(MongoAPIError);
155+
});
156+
});
157+
context('when a $out stage is add with .out()', () => {
158+
it('throws a MongoAPIError', function () {
159+
expect(() => {
160+
aggregationCursor.out('test');
161+
}).to.throw(MongoAPIError);
162+
});
163+
});
164+
});
165+
129166
it('sets deeply identical aggregations pipelines', () => {
130167
const collection = client.db().collection('test');
131168

@@ -157,4 +194,29 @@ describe('class AggregationCursor', () => {
157194
expect(builderGenericStageCursor.pipeline).to.deep.equal(expectedPipeline);
158195
});
159196
});
197+
198+
describe('constructor()', () => {
199+
context('when CSOT is enabled', () => {
200+
let client: MongoClient;
201+
before(function () {
202+
client = new MongoClient('mongodb://iLoveJavascript', { timeoutMS: 100 });
203+
});
204+
context('when timeoutMode=ITERATION and a $out stage is provided', function () {
205+
expect(() => {
206+
client
207+
.db('test')
208+
.collection('test')
209+
.aggregate([{ $out: 'test' }], { timeoutMode: CursorTimeoutMode.ITERATION });
210+
}).to.throw(MongoAPIError);
211+
});
212+
context('when timeoutMode=ITERATION and a $merge stage is provided', function () {
213+
expect(() => {
214+
client
215+
.db('test')
216+
.collection('test')
217+
.aggregate([{ $merge: 'test' }], { timeoutMode: CursorTimeoutMode.ITERATION });
218+
}).to.throw(MongoAPIError);
219+
});
220+
});
221+
});
160222
});

0 commit comments

Comments
 (0)