Skip to content

Commit 2f83360

Browse files
committed
fix: allow event loop to process during wait queue processing
Running `processWaitQueue` on the next tick allows the event loop to process while the connection pool is processing large numbers of wait queue members. This also uncovered a few issues with timing in our tests, and in some cases our top-level API: - `commitTransaction` / `abortTransaction` use `maybePromise` now - `endSession` must wait for all the machinery behind the scenes to check out a connection and write a message before considering its job finished - internal calls to `kill` a cursor now await the the process of fully sending that command, even if they ignore the response NODE-2803
1 parent c16ec43 commit 2f83360

File tree

6 files changed

+72
-50
lines changed

6 files changed

+72
-50
lines changed

src/cmap/connection_pool.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ export class ConnectionPool extends EventEmitter {
283283
return;
284284
}
285285

286-
// add this request to the wait queue
287286
const waitQueueMember: WaitQueueMember = { callback };
288287
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
289288
if (waitQueueTimeoutMS) {
@@ -299,11 +298,8 @@ export class ConnectionPool extends EventEmitter {
299298
}, waitQueueTimeoutMS);
300299
}
301300

302-
// place the member at the end of the wait queue
303301
this[kWaitQueue].push(waitQueueMember);
304-
305-
// process the wait queue
306-
processWaitQueue(this);
302+
setImmediate(() => processWaitQueue(this));
307303
}
308304

309305
/**
@@ -316,7 +312,6 @@ export class ConnectionPool extends EventEmitter {
316312
const stale = connectionIsStale(this, connection);
317313
const willDestroy = !!(poolClosed || stale || connection.closed);
318314

319-
// Properly adjust state of connection
320315
if (!willDestroy) {
321316
connection.markAvailable();
322317
this[kConnections].push(connection);
@@ -329,7 +324,7 @@ export class ConnectionPool extends EventEmitter {
329324
destroyConnection(this, connection, reason);
330325
}
331326

332-
processWaitQueue(this);
327+
setImmediate(() => processWaitQueue(this));
333328
}
334329

335330
/**
@@ -503,7 +498,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
503498

504499
// otherwise add it to the pool for later acquisition, and try to process the wait queue
505500
pool[kConnections].push(connection);
506-
processWaitQueue(pool);
501+
setImmediate(() => processWaitQueue(pool));
507502
});
508503
}
509504

src/cursor/core_cursor.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,10 @@ function nextFunction(self: CoreCursor, callback: Callback) {
701701

702702
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
703703
// Ensure we kill the cursor on the server
704-
self.kill();
705-
// Set cursor in dead and notified state
706-
return setCursorDeadAndNotified(self, callback);
704+
return self.kill(() =>
705+
// Set cursor in dead and notified state
706+
setCursorDeadAndNotified(self, callback)
707+
);
707708
} else if (
708709
self.cursorState.cursorIndex === self.cursorState.documents.length &&
709710
!Long.ZERO.equals(cursorId)
@@ -775,9 +776,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
775776
} else {
776777
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
777778
// Ensure we kill the cursor on the server
778-
self.kill();
779-
// Set cursor in dead and notified state
780-
return setCursorDeadAndNotified(self, callback);
779+
self.kill(() =>
780+
// Set cursor in dead and notified state
781+
setCursorDeadAndNotified(self, callback)
782+
);
783+
784+
return;
781785
}
782786

783787
// Increment the current cursor limit
@@ -789,11 +793,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
789793
// Doc overflow
790794
if (!doc || doc.$err) {
791795
// Ensure we kill the cursor on the server
792-
self.kill();
793-
// Set cursor in dead and notified state
794-
return setCursorDeadAndNotified(self, () =>
795-
callback(new MongoError(doc ? doc.$err : undefined))
796+
self.kill(() =>
797+
// Set cursor in dead and notified state
798+
setCursorDeadAndNotified(self, () => callback(new MongoError(doc ? doc.$err : undefined)))
796799
);
800+
801+
return;
797802
}
798803

799804
// Transform the doc with passed in transformation method if provided

src/sessions.ts

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ export interface ClientSessionOptions {
5252
/** @public */
5353
export type WithTransactionCallback = (session: ClientSession) => Promise<any> | void;
5454

55+
const kServerSession = Symbol('serverSession');
56+
5557
/**
5658
* A class representing a client session on the server
5759
*
@@ -62,7 +64,6 @@ class ClientSession extends EventEmitter {
6264
topology: Topology;
6365
sessionPool: ServerSessionPool;
6466
hasEnded: boolean;
65-
serverSession?: ServerSession;
6667
clientOptions?: MongoClientOptions;
6768
supports: { causalConsistency: boolean };
6869
clusterTime?: ClusterTime;
@@ -71,6 +72,7 @@ class ClientSession extends EventEmitter {
7172
owner: symbol | CoreCursor;
7273
defaultTransactionOptions: TransactionOptions;
7374
transaction: Transaction;
75+
[kServerSession]?: ServerSession;
7476

7577
/**
7678
* Create a client session.
@@ -102,8 +104,8 @@ class ClientSession extends EventEmitter {
102104
this.topology = topology;
103105
this.sessionPool = sessionPool;
104106
this.hasEnded = false;
105-
this.serverSession = sessionPool.acquire();
106107
this.clientOptions = clientOptions;
108+
this[kServerSession] = undefined;
107109

108110
this.supports = {
109111
causalConsistency:
@@ -124,41 +126,61 @@ class ClientSession extends EventEmitter {
124126
return this.serverSession?.id;
125127
}
126128

129+
get serverSession(): ServerSession {
130+
if (this[kServerSession] == null) {
131+
this[kServerSession] = this.sessionPool.acquire();
132+
}
133+
134+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
135+
return this[kServerSession]!;
136+
}
137+
127138
/**
128139
* Ends this session on the server
129140
*
130141
* @param options - Optional settings. Currently reserved for future use
131142
* @param callback - Optional callback for completion of this operation
132143
*/
133-
endSession(): void;
144+
endSession(): Promise<void>;
134145
endSession(callback: Callback<void>): void;
146+
endSession(options: Record<string, unknown>): Promise<void>;
135147
endSession(options: Record<string, unknown>, callback: Callback<void>): void;
136-
endSession(options?: Record<string, unknown> | Callback<void>, callback?: Callback<void>): void {
137-
if (typeof options === 'function') (callback = options as Callback), (options = {});
148+
endSession(
149+
options?: Record<string, unknown> | Callback<void>,
150+
callback?: Callback<void>
151+
): void | Promise<void> {
152+
if (typeof options === 'function') (callback = options), (options = {});
138153
options = options || {};
139154

140-
if (this.hasEnded) {
141-
if (typeof callback === 'function') callback();
142-
return;
143-
}
155+
return maybePromise(callback, done => {
156+
if (this.hasEnded) {
157+
return done();
158+
}
144159

145-
if (this.serverSession && this.inTransaction()) {
146-
this.abortTransaction(); // pass in callback?
147-
}
160+
const completeEndSession = () => {
161+
// release the server session back to the pool
162+
this.sessionPool.release(this.serverSession);
163+
this[kServerSession] = undefined;
148164

149-
// mark the session as ended, and emit a signal
150-
this.hasEnded = true;
151-
this.emit('ended', this);
165+
// mark the session as ended, and emit a signal
166+
this.hasEnded = true;
167+
this.emit('ended', this);
152168

153-
// release the server session back to the pool
154-
if (this.serverSession) {
155-
this.sessionPool.release(this.serverSession);
156-
}
169+
// spec indicates that we should ignore all errors for `endSessions`
170+
done();
171+
};
157172

158-
this.serverSession = undefined;
173+
if (this.serverSession && this.inTransaction()) {
174+
this.abortTransaction(err => {
175+
if (err) return done(err);
176+
completeEndSession();
177+
});
178+
179+
return;
180+
}
159181

160-
// spec indicates that we should ignore all errors for `endSessions`
161-
if (typeof callback === 'function') callback();
182+
completeEndSession();
183+
});
162184
}
163185

164186
/**

test/functional/cursor.test.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3999,9 +3999,9 @@ describe('Cursor', function () {
39993999
const cursor = collection.find();
40004000
const promise = cursor.forEach();
40014001
expect(promise).to.exist.and.to.be.an.instanceof(Promise);
4002-
promise.catch(() => {});
4003-
4004-
cursor.close(() => client.close(() => done()));
4002+
promise.then(() => {
4003+
cursor.close(() => client.close(() => done()));
4004+
});
40054005
});
40064006
});
40074007

test/functional/spec-runner/index.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,12 @@ function runTestSuiteTest(configuration, spec, context) {
344344
throw err;
345345
})
346346
.then(() => {
347-
if (session0) session0.endSession();
348-
if (session1) session1.endSession();
349-
350-
return validateExpectations(context.commandEvents, spec, savedSessionData);
351-
});
347+
const promises = [];
348+
if (session0) promises.push(session0.endSession());
349+
if (session1) promises.push(session1.endSession());
350+
return Promise.all(promises);
351+
})
352+
.then(() => validateExpectations(context.commandEvents, spec, savedSessionData));
352353
});
353354
}
354355

test/unit/cmap/connection_pool.test.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ describe('Connection Pool', function () {
139139
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
140140
pool.checkIn(conn);
141141

142-
expect(pool).property('waitQueueSize').to.equal(0);
143-
142+
setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0));
144143
done();
145144
});
146145
});

0 commit comments

Comments
 (0)