Skip to content

Commit a8b3540

Browse files
committed
fix(NODE-5993): connection's aborted promise leak
1 parent 4ac9675 commit a8b3540

File tree

4 files changed

+150
-55
lines changed

4 files changed

+150
-55
lines changed

src/cmap/connection.ts

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { type Readable, Transform, type TransformCallback } from 'stream';
22
import { clearTimeout, setTimeout } from 'timers';
3-
import { promisify } from 'util';
43

54
import type { BSONSerializeOptions, Document, ObjectId } from '../bson';
65
import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter';
@@ -180,18 +179,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
180179
* Once connection is established, command logging can log events (if enabled)
181180
*/
182181
public established: boolean;
182+
/** Indicates that the connection (including underlying TCP socket) has been closed. */
183+
public closed = false;
183184

184185
private lastUseTime: number;
185186
private clusterTime: Document | null = null;
187+
private error: Error | null = null;
188+
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;
186189

187190
private readonly socketTimeoutMS: number;
188191
private readonly monitorCommands: boolean;
189192
private readonly socket: Stream;
190-
private readonly controller: AbortController;
191-
private readonly signal: AbortSignal;
192193
private readonly messageStream: Readable;
193-
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
194-
private readonly aborted: Promise<never>;
195194

196195
/** @event */
197196
static readonly COMMAND_STARTED = COMMAND_STARTED;
@@ -211,6 +210,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
211210
constructor(stream: Stream, options: ConnectionOptions) {
212211
super();
213212

213+
this.socket = stream;
214214
this.id = options.id;
215215
this.address = streamIdentifier(stream, options);
216216
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
@@ -223,39 +223,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
223223
this.generation = options.generation;
224224
this.lastUseTime = now();
225225

226-
this.socket = stream;
227-
228-
// TODO: Remove signal from connection layer
229-
this.controller = new AbortController();
230-
const { signal } = this.controller;
231-
this.signal = signal;
232-
const { promise: aborted, reject } = promiseWithResolvers<never>();
233-
aborted.then(undefined, () => null); // Prevent unhandled rejection
234-
this.signal.addEventListener(
235-
'abort',
236-
function onAbort() {
237-
reject(signal.reason);
238-
},
239-
{ once: true }
240-
);
241-
this.aborted = aborted;
242-
243226
this.messageStream = this.socket
244227
.on('error', this.onError.bind(this))
245228
.pipe(new SizedMessageTransform({ connection: this }))
246229
.on('error', this.onError.bind(this));
247230
this.socket.on('close', this.onClose.bind(this));
248231
this.socket.on('timeout', this.onTimeout.bind(this));
249-
250-
const socketWrite = promisify(this.socket.write.bind(this.socket));
251-
this.socketWrite = async buffer => {
252-
return Promise.race([socketWrite(buffer), this.aborted]);
253-
};
254-
}
255-
256-
/** Indicates that the connection (including underlying TCP socket) has been closed. */
257-
public get closed(): boolean {
258-
return this.signal.aborted;
259232
}
260233

261234
public get hello() {
@@ -355,7 +328,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
355328
}
356329

357330
this.socket.destroy();
358-
this.controller.abort(error);
331+
if (error) {
332+
this.error = error;
333+
this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection
334+
}
335+
this.closed = true;
359336
this.emit(Connection.CLOSE);
360337
}
361338

@@ -596,7 +573,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
596573
}
597574

598575
private throwIfAborted() {
599-
this.signal.throwIfAborted();
576+
if (this.error) throw this.error;
600577
}
601578

602579
/**
@@ -619,7 +596,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
619596

620597
const buffer = Buffer.concat(await finalCommand.toBin());
621598

622-
return this.socketWrite(buffer);
599+
if (this.socket.write(buffer)) return;
600+
601+
const { promise: drained, resolve, reject } = promiseWithResolvers<void>();
602+
const onDrain = () => resolve();
603+
const onError = (error: Error) => reject(error);
604+
605+
this.socket.once('drain', onDrain).once('error', onError);
606+
try {
607+
return await drained;
608+
} finally {
609+
this.socket.off('drain', onDrain).off('error', onError);
610+
}
623611
}
624612

625613
/**
@@ -632,13 +620,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
632620
* Note that `for-await` loops call `return` automatically when the loop is exited.
633621
*/
634622
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
635-
for await (const message of onData(this.messageStream, { signal: this.signal })) {
636-
const response = await decompressResponse(message);
637-
yield response;
623+
try {
624+
this.dataEvents = this.dataEvents = onData(this.messageStream);
625+
for await (const message of this.dataEvents) {
626+
const response = await decompressResponse(message);
627+
yield response;
638628

639-
if (!response.moreToCome) {
640-
return;
629+
if (!response.moreToCome) {
630+
return;
631+
}
641632
}
633+
} finally {
634+
this.dataEvents = null;
635+
this.throwIfAborted();
642636
}
643637
}
644638
}

src/cmap/wire_protocol/on_data.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ type PendingPromises = Omit<
1818
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1919
* It will reject upon an error event or if the provided signal is aborted.
2020
*/
21-
export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
22-
const signal = options.signal;
23-
21+
export function onData(emitter: EventEmitter) {
2422
// Setup pending events and pending promise lists
2523
/**
2624
* When the caller has not yet called .next(), we store the
@@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
8987
emitter.on('data', eventHandler);
9088
emitter.on('error', errorHandler);
9189

92-
if (signal.aborted) {
93-
// If the signal is aborted, set up the first .next() call to be a rejection
94-
queueMicrotask(abortListener);
95-
} else {
96-
signal.addEventListener('abort', abortListener, { once: true });
97-
}
98-
9990
return iterator;
10091

101-
function abortListener() {
102-
errorHandler(signal.reason);
103-
}
104-
10592
function eventHandler(value: Buffer) {
10693
const promise = unconsumedPromises.shift();
10794
if (promise != null) promise.resolve({ value, done: false });
@@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
119106
// Adding event handlers
120107
emitter.off('data', eventHandler);
121108
emitter.off('error', errorHandler);
122-
signal.removeEventListener('abort', abortListener);
123109
finished = true;
124110
const doneResult = { value: undefined, done: finished } as const;
125111

test/integration/connection-monitoring-and-pooling/connection.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import type * as net from 'node:net';
2+
13
import { expect } from 'chai';
4+
import { type EventEmitter, once } from 'events';
5+
import * as sinon from 'sinon';
26

37
import {
8+
Binary,
49
connect,
510
Connection,
611
type ConnectionOptions,
712
HostAddress,
13+
isHello,
814
LEGACY_HELLO_COMMAND,
915
makeClientMetadata,
1016
MongoClient,
@@ -14,7 +20,9 @@ import {
1420
ServerHeartbeatStartedEvent,
1521
Topology
1622
} from '../../mongodb';
23+
import * as mock from '../../tools/mongodb-mock/index';
1724
import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration';
25+
import { getSymbolFrom, sleep } from '../../tools/utils';
1826
import { assert as test, setupDatabase } from '../shared';
1927

2028
const commonConnectOptions = {
@@ -197,6 +205,80 @@ describe('Connection', function () {
197205
client.connect();
198206
});
199207

208+
context(
209+
'when a large message is written to the socket',
210+
{ requires: { topology: 'single' } },
211+
() => {
212+
let client, mockServer: import('../../tools/mongodb-mock/src/server').MockServer;
213+
214+
beforeEach(async function () {
215+
mockServer = await mock.createServer();
216+
217+
mockServer
218+
.addMessageHandler('insert', req => {
219+
setTimeout(() => {
220+
req.reply({ ok: 1 });
221+
}, 800);
222+
})
223+
.addMessageHandler('hello', req => {
224+
req.reply(Object.assign({}, mock.HELLO));
225+
})
226+
.addMessageHandler(LEGACY_HELLO_COMMAND, req => {
227+
req.reply(Object.assign({}, mock.HELLO));
228+
});
229+
230+
client = new MongoClient(`mongodb://${mockServer.uri()}`, {
231+
minPoolSize: 1,
232+
maxPoolSize: 1
233+
});
234+
});
235+
236+
afterEach(async function () {
237+
await client.close();
238+
mockServer.destroy();
239+
sinon.restore();
240+
});
241+
242+
it('waits for an async drain event because the write was buffered', async () => {
243+
const connectionReady = once(client, 'connectionReady');
244+
await client.connect();
245+
await connectionReady;
246+
247+
// Get the only connection
248+
const pool = [...client.topology.s.servers.values()][0].pool;
249+
const connection = pool[getSymbolFrom(pool, 'connections')].first();
250+
const socket: EventEmitter = connection.socket;
251+
252+
// Spy on the socket event listeners
253+
const addedListeners: string[] = [];
254+
const removedListeners: string[] = [];
255+
socket
256+
.on('removeListener', name => removedListeners.push(name))
257+
.on('newListener', name => addedListeners.push(name));
258+
259+
// Make server sockets block
260+
for (const s of mockServer.sockets) s.pause();
261+
262+
const insert = client
263+
.db('test')
264+
.collection('test')
265+
// Anything above 16Kb should work I think (10mb to be extra sure)
266+
.insertOne({ a: new Binary(Buffer.alloc(10 * (2 ** 10) ** 2), 127) });
267+
268+
// Sleep a bit and unblock server sockets
269+
await sleep(10);
270+
for (const s of mockServer.sockets) s.resume();
271+
272+
// Let the operation finish
273+
await insert;
274+
275+
// Ensure that we used the drain event for this write
276+
expect(addedListeners).to.deep.equal(['drain', 'error']);
277+
expect(removedListeners).to.deep.equal(['drain', 'error']);
278+
});
279+
}
280+
);
281+
200282
context('when connecting with a username and password', () => {
201283
let utilClient: MongoClient;
202284
let client: MongoClient;

test/integration/node-specific/resource_clean_up.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import * as v8 from 'node:v8';
2+
13
import { expect } from 'chai';
24

5+
import { sleep } from '../../tools/utils';
36
import { runScript } from './resource_tracking_script_builder';
47

58
/**
@@ -86,4 +89,34 @@ describe('Driver Resources', () => {
8689
});
8790
});
8891
});
92+
93+
context('when 100s of operations are executed and complete', () => {
94+
beforeEach(function () {
95+
if (this.currentTest && typeof v8.queryObjects !== 'function') {
96+
this.currentTest.skipReason = 'Test requires v8.queryObjects API to count Promises';
97+
this.currentTest?.skip();
98+
}
99+
});
100+
101+
let client;
102+
beforeEach(async function () {
103+
client = this.configuration.newClient();
104+
});
105+
106+
afterEach(async function () {
107+
await client.close();
108+
});
109+
110+
it('does not leave behind additional promises', async () => {
111+
const test = client.db('test').collection('test');
112+
const promiseCountBefore = v8.queryObjects(Promise, { format: 'count' });
113+
for (let i = 0; i < 100; i++) {
114+
await test.findOne();
115+
}
116+
await sleep(10);
117+
const promiseCountAfter = v8.queryObjects(Promise, { format: 'count' });
118+
119+
expect(promiseCountAfter).to.be.within(promiseCountBefore - 5, promiseCountBefore + 5);
120+
});
121+
});
89122
});

0 commit comments

Comments
 (0)