Skip to content

Commit cb3c039

Browse files
author
Thomas Reggi
committed
Merge branch 'master' into NODE-2458/cursor-sort
2 parents 27434d7 + 054838f commit cb3c039

File tree

9 files changed

+121
-31
lines changed

9 files changed

+121
-31
lines changed

src/change_stream.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,6 @@ export class ChangeStream extends EventEmitter {
262262
});
263263
}
264264

265-
/** @internal */
266-
triggerError(error: AnyError): void {
267-
processError(this, error);
268-
}
269-
270265
/** @internal */
271266
get cursorStream(): CursorStream | undefined {
272267
return this[kCursorStream];
@@ -568,12 +563,19 @@ function processNewChange(
568563
change: ChangeStreamDocument,
569564
callback?: Callback
570565
) {
571-
if (changeStream.closed) return callback && callback(CHANGESTREAM_CLOSED_ERROR);
566+
if (changeStream.closed) {
567+
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
568+
return;
569+
}
572570

573571
// a null change means the cursor has been notified, implicitly closing the change stream
574-
if (change == null) return closeWithError(changeStream, CHANGESTREAM_CLOSED_ERROR, callback);
572+
if (change == null) {
573+
return closeWithError(changeStream, CHANGESTREAM_CLOSED_ERROR, callback);
574+
}
575575

576-
if (change && !change._id) return closeWithError(changeStream, NO_RESUME_TOKEN_ERROR, callback);
576+
if (change && !change._id) {
577+
return closeWithError(changeStream, NO_RESUME_TOKEN_ERROR, callback);
578+
}
577579

578580
// cache the resume token
579581
changeStream.cursor?.cacheResumeToken(change._id);

src/cmap/connect.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ function makeConnection(
277277
: typeof options.connectTimeoutMS === 'number'
278278
? options.connectTimeoutMS
279279
: 30000;
280-
const socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
280+
const socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0;
281281
const rejectUnauthorized =
282282
typeof options.rejectUnauthorized === 'boolean' ? options.rejectUnauthorized : true;
283283

src/cmap/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ export class Connection extends EventEmitter {
9999
super(options);
100100
this.id = options.id;
101101
this.address = streamIdentifier(stream);
102-
this.socketTimeout = options.socketTimeout ?? 360000;
102+
this.socketTimeout = options.socketTimeout ?? 0;
103103
this.monitorCommands = options.monitorCommands ?? options.monitorCommands;
104104
this.closed = false;
105105
this.destroyed = false;

src/cursor/aggregation_cursor.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { MongoError } from '../error';
22
import { Cursor, CursorOptions } from './cursor';
33
import { CursorState } from './core_cursor';
4-
import { deprecate } from 'util';
54
import type { AggregateOperation, AggregateOptions } from '../operations/aggregate';
65
import type { Document } from '../bson';
76
import type { Sort } from '../sort';
@@ -110,8 +109,8 @@ export class AggregationCursor extends Cursor<AggregateOperation, AggregationCur
110109

111110
// deprecated methods
112111
/** @deprecated Add a geoNear stage to the aggregation pipeline */
113-
geoNear = deprecate(($geoNear: Document) => {
112+
geoNear($geoNear: Document): this {
114113
this.operation.addToPipeline({ $geoNear });
115114
return this;
116-
}, 'The `$geoNear` stage is deprecated in MongoDB 4.0, and removed in version 4.2.');
115+
}
117116
}

src/cursor/cursor.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,77 @@ export class CursorStream extends Readable {
126126
}
127127
}
128128

129+
const kCursor = Symbol('cursor');
130+
131+
export class CursorStream extends Readable {
132+
[kCursor]: Cursor;
133+
options: CursorStreamOptions;
134+
135+
/** @event */
136+
static readonly CLOSE = 'close' as const;
137+
/** @event */
138+
static readonly DATA = 'data' as const;
139+
/** @event */
140+
static readonly END = 'end' as const;
141+
/** @event */
142+
static readonly FINISH = 'finish' as const;
143+
/** @event */
144+
static readonly ERROR = 'error' as const;
145+
/** @event */
146+
static readonly PAUSE = 'pause' as const;
147+
/** @event */
148+
static readonly READABLE = 'readable' as const;
149+
/** @event */
150+
static readonly RESUME = 'resume' as const;
151+
152+
constructor(cursor: Cursor, options?: CursorStreamOptions) {
153+
super({ objectMode: true });
154+
this[kCursor] = cursor;
155+
this.options = options || {};
156+
}
157+
158+
destroy(err?: AnyError): void {
159+
this.pause();
160+
this[kCursor].close();
161+
super.destroy(err);
162+
}
163+
164+
/** @internal */
165+
_read(): void {
166+
const cursor = this[kCursor];
167+
if ((cursor.s && cursor.s.state === CursorState.CLOSED) || cursor.isDead()) {
168+
this.push(null);
169+
return;
170+
}
171+
172+
// Get the next item
173+
cursor._next((err, result) => {
174+
if (err) {
175+
if (cursor.s && cursor.s.state === CursorState.CLOSED) return;
176+
if (!cursor.isDead()) this.emit(CursorStream.ERROR, err);
177+
cursor.close(() => this.emit(CursorStream.END));
178+
return;
179+
}
180+
181+
// If we provided a transformation method
182+
if (typeof this.options.transform === 'function' && result != null) {
183+
this.push(this.options.transform(result));
184+
return;
185+
}
186+
187+
// Return the result
188+
this.push(result);
189+
190+
if (result === null && cursor.isDead()) {
191+
this.once(CursorStream.END, () => {
192+
cursor.close();
193+
this.emit(CursorStream.FINISH);
194+
});
195+
}
196+
});
197+
}
198+
}
199+
129200
/**
130201
* **CURSORS Cannot directly be instantiated**
131202
* The `Cursor` class is an internal class that embodies a cursor on MongoDB

src/mongo_client.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type { AuthMechanism } from './cmap/auth/defaultAuthProviders';
1717
import type { Topology } from './sdam/topology';
1818
import type { ClientSession, ClientSessionOptions } from './sessions';
1919
import type { OperationParent } from './operations/command';
20+
import type { TagSet } from './sdam/server_description';
2021

2122
/** @public */
2223
export enum LogLevel {
@@ -97,7 +98,7 @@ export interface MongoURIOptions extends Pick<WriteConcernOptions, 'journal' | '
9798
/** Specifies, in seconds, how stale a secondary can be before the client stops using it for read operations. */
9899
maxStalenessSeconds?: number;
99100
/** Specifies the tags document as a comma-separated list of colon-separated key-value pairs. */
100-
readPreferenceTags?: string;
101+
readPreferenceTags?: TagSet[];
101102
/** Specify the database name associated with the user’s credentials. */
102103
authSource?: string;
103104
/** Specify the authentication mechanism that MongoDB will use to authenticate the connection. */
@@ -220,6 +221,7 @@ export interface MongoClientPrivate {
220221
sessions: Set<ClientSession>;
221222
readConcern?: ReadConcern;
222223
writeConcern?: WriteConcern;
224+
readPreference: ReadPreference;
223225
namespace: MongoDBNamespace;
224226
logger: Logger;
225227
}
@@ -281,6 +283,7 @@ export class MongoClient extends EventEmitter implements OperationParent {
281283
sessions: new Set(),
282284
readConcern: ReadConcern.fromOptions(options),
283285
writeConcern: WriteConcern.fromOptions(options),
286+
readPreference: ReadPreference.fromOptions(options) || ReadPreference.primary,
284287
namespace: new MongoDBNamespace('admin'),
285288
logger: options?.logger ?? new Logger('MongoClient')
286289
};
@@ -295,7 +298,7 @@ export class MongoClient extends EventEmitter implements OperationParent {
295298
}
296299

297300
get readPreference(): ReadPreference {
298-
return ReadPreference.primary;
301+
return this.s.readPreference;
299302
}
300303

301304
get logger(): Logger {

src/operations/connect.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ export function connect(
211211
const finalOptions = createUnifiedOptions(urlOptions, options);
212212

213213
// Check if we have connection and socket timeout set
214-
if (finalOptions.socketTimeoutMS == null) finalOptions.socketTimeoutMS = 360000;
214+
if (finalOptions.socketTimeoutMS == null) finalOptions.socketTimeoutMS = 0;
215215
if (finalOptions.connectTimeoutMS == null) finalOptions.connectTimeoutMS = 10000;
216216
if (finalOptions.retryWrites == null) finalOptions.retryWrites = true;
217217
if (finalOptions.useRecoveryToken == null) finalOptions.useRecoveryToken = true;
@@ -582,7 +582,7 @@ function translateOptions(options: any) {
582582
}
583583

584584
// Set the socket and connection timeouts
585-
if (options.socketTimeoutMS == null) options.socketTimeoutMS = 360000;
585+
if (options.socketTimeoutMS == null) options.socketTimeoutMS = 0;
586586
if (options.connectTimeoutMS == null) options.connectTimeoutMS = 10000;
587587

588588
const translations = {

test/functional/change_stream.test.js

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,16 @@ function withChangeStream(dbName, collectionName, callback) {
2929

3030
return withClient((client, done) => {
3131
const db = client.db(dbName);
32-
db.dropCollection(collectionName)
33-
.catch(() => {})
34-
.then(() =>
35-
db.createCollection(collectionName, { w: 'majority' }, (err, collection) => {
36-
if (err) return done(err);
37-
withCursor(
38-
collection.watch(),
39-
(cursor, done) => callback(collection, cursor, done),
40-
err => collection.drop(dropErr => done(err || dropErr))
41-
);
42-
})
43-
)
44-
.catch(done);
32+
db.dropCollection(collectionName, () => {
33+
db.createCollection(collectionName, { w: 'majority' }, (err, collection) => {
34+
if (err) return done(err);
35+
withCursor(
36+
collection.watch(),
37+
(cursor, done) => callback(collection, cursor, done),
38+
err => collection.drop(dropErr => done(err || dropErr))
39+
);
40+
});
41+
});
4542
});
4643
}
4744

@@ -66,7 +63,18 @@ function triggerResumableError(changeStream, delay, onClose) {
6663
});
6764

6865
function triggerError() {
69-
changeStream.triggerError(new MongoNetworkError('fake error'));
66+
const cursorStream = changeStream.cursorStream;
67+
if (cursorStream) {
68+
cursorStream.emit('error', new MongoNetworkError('error triggered from test'));
69+
return;
70+
}
71+
72+
const nextStub = sinon.stub(changeStream.cursor, 'next').callsFake(function (callback) {
73+
callback(new MongoNetworkError('error triggered from test'));
74+
nextStub.restore();
75+
});
76+
77+
changeStream.next(() => {});
7078
}
7179

7280
if (delay != null) {

test/functional/mongo_client.test.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
var f = require('util').format;
44
var test = require('./shared').assert;
55
var setupDatabase = require('./shared').setupDatabase;
6+
const { ReadPreference } = require('../../src');
67
const { Db } = require('../../src/db');
78
const expect = require('chai').expect;
89

@@ -301,4 +302,10 @@ describe('MongoClient', function () {
301302
}
302303
});
303304
});
305+
306+
it('should cache a resolved readPreference from options', function () {
307+
const client = this.configuration.newClient({}, { readPreference: ReadPreference.SECONDARY });
308+
expect(client.readPreference).to.be.instanceOf(ReadPreference);
309+
expect(client.readPreference).to.have.property('mode', ReadPreference.SECONDARY);
310+
});
304311
});

0 commit comments

Comments
 (0)