Skip to content

Commit 3b40d9f

Browse files
authored
Merge branch 'main' into NODE-5915/refactor-topology-close-to-be-synchronous
2 parents 6aa97e0 + 69de253 commit 3b40d9f

File tree

14 files changed

+416
-78
lines changed

14 files changed

+416
-78
lines changed

src/cmap/connect.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import {
2525
type ConnectionOptions,
2626
CryptoConnection
2727
} from './connection';
28-
import type { ClientMetadata } from './handshake/client_metadata';
2928
import {
3029
MAX_SUPPORTED_SERVER_VERSION,
3130
MAX_SUPPORTED_WIRE_VERSION,
@@ -183,7 +182,7 @@ export interface HandshakeDocument extends Document {
183182
ismaster?: boolean;
184183
hello?: boolean;
185184
helloOk?: boolean;
186-
client: ClientMetadata;
185+
client: Document;
187186
compression: string[];
188187
saslSupportedMechs?: string;
189188
loadBalanced?: boolean;
@@ -200,11 +199,12 @@ export async function prepareHandshakeDocument(
200199
const options = authContext.options;
201200
const compressors = options.compressors ? options.compressors : [];
202201
const { serverApi } = authContext.connection;
202+
const clientMetadata: Document = await options.extendedMetadata;
203203

204204
const handshakeDoc: HandshakeDocument = {
205205
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
206206
helloOk: true,
207-
client: options.metadata,
207+
client: clientMetadata,
208208
compression: compressors
209209
};
210210

src/cmap/connection.ts

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { type Readable, Transform, type TransformCallback } from 'stream';
22
import { clearTimeout, setTimeout } from 'timers';
3-
import { promisify } from 'util';
43

54
import type { BSONSerializeOptions, Document, ObjectId } from '../bson';
65
import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter';
@@ -37,7 +36,7 @@ import {
3736
maxWireVersion,
3837
type MongoDBNamespace,
3938
now,
40-
promiseWithResolvers,
39+
once,
4140
uuidV4
4241
} from '../utils';
4342
import type { WriteConcern } from '../write_concern';
@@ -119,6 +118,8 @@ export interface ConnectionOptions
119118
cancellationToken?: CancellationToken;
120119
metadata: ClientMetadata;
121120
/** @internal */
121+
extendedMetadata: Promise<Document>;
122+
/** @internal */
122123
mongoLogger?: MongoLogger | undefined;
123124
}
124125

@@ -180,18 +181,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
180181
* Once connection is established, command logging can log events (if enabled)
181182
*/
182183
public established: boolean;
184+
/** Indicates that the connection (including underlying TCP socket) has been closed. */
185+
public closed = false;
183186

184187
private lastUseTime: number;
185188
private clusterTime: Document | null = null;
189+
private error: Error | null = null;
190+
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;
186191

187192
private readonly socketTimeoutMS: number;
188193
private readonly monitorCommands: boolean;
189194
private readonly socket: Stream;
190-
private readonly controller: AbortController;
191-
private readonly signal: AbortSignal;
192195
private readonly messageStream: Readable;
193-
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
194-
private readonly aborted: Promise<never>;
195196

196197
/** @event */
197198
static readonly COMMAND_STARTED = COMMAND_STARTED;
@@ -211,6 +212,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
211212
constructor(stream: Stream, options: ConnectionOptions) {
212213
super();
213214

215+
this.socket = stream;
214216
this.id = options.id;
215217
this.address = streamIdentifier(stream, options);
216218
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
@@ -223,39 +225,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
223225
this.generation = options.generation;
224226
this.lastUseTime = now();
225227

226-
this.socket = stream;
227-
228-
// TODO: Remove signal from connection layer
229-
this.controller = new AbortController();
230-
const { signal } = this.controller;
231-
this.signal = signal;
232-
const { promise: aborted, reject } = promiseWithResolvers<never>();
233-
aborted.then(undefined, () => null); // Prevent unhandled rejection
234-
this.signal.addEventListener(
235-
'abort',
236-
function onAbort() {
237-
reject(signal.reason);
238-
},
239-
{ once: true }
240-
);
241-
this.aborted = aborted;
242-
243228
this.messageStream = this.socket
244229
.on('error', this.onError.bind(this))
245230
.pipe(new SizedMessageTransform({ connection: this }))
246231
.on('error', this.onError.bind(this));
247232
this.socket.on('close', this.onClose.bind(this));
248233
this.socket.on('timeout', this.onTimeout.bind(this));
249-
250-
const socketWrite = promisify(this.socket.write.bind(this.socket));
251-
this.socketWrite = async buffer => {
252-
return Promise.race([socketWrite(buffer), this.aborted]);
253-
};
254-
}
255-
256-
/** Indicates that the connection (including underlying TCP socket) has been closed. */
257-
public get closed(): boolean {
258-
return this.signal.aborted;
259234
}
260235

261236
public get hello() {
@@ -306,7 +281,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
306281
this.lastUseTime = now();
307282
}
308283

309-
public onError(error?: Error) {
284+
public onError(error: Error) {
310285
this.cleanup(error);
311286
}
312287

@@ -345,13 +320,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
345320
*
346321
* This method does nothing if the connection is already closed.
347322
*/
348-
private cleanup(error?: Error): void {
323+
private cleanup(error: Error): void {
349324
if (this.closed) {
350325
return;
351326
}
352327

353328
this.socket.destroy();
354-
this.controller.abort(error);
329+
this.error = error;
330+
this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection
331+
this.closed = true;
355332
this.emit(Connection.CLOSE);
356333
}
357334

@@ -592,7 +569,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
592569
}
593570

594571
private throwIfAborted() {
595-
this.signal.throwIfAborted();
572+
if (this.error) throw this.error;
596573
}
597574

598575
/**
@@ -615,7 +592,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
615592

616593
const buffer = Buffer.concat(await finalCommand.toBin());
617594

618-
return this.socketWrite(buffer);
595+
if (this.socket.write(buffer)) return;
596+
return once(this.socket, 'drain');
619597
}
620598

621599
/**
@@ -628,13 +606,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
628606
* Note that `for-await` loops call `return` automatically when the loop is exited.
629607
*/
630608
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
631-
for await (const message of onData(this.messageStream, { signal: this.signal })) {
632-
const response = await decompressResponse(message);
633-
yield response;
609+
try {
610+
this.dataEvents = onData(this.messageStream);
611+
for await (const message of this.dataEvents) {
612+
const response = await decompressResponse(message);
613+
yield response;
634614

635-
if (!response.moreToCome) {
636-
return;
615+
if (!response.moreToCome) {
616+
return;
617+
}
637618
}
619+
} finally {
620+
this.dataEvents = null;
621+
this.throwIfAborted();
638622
}
639623
}
640624
}

src/cmap/connection_pool.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
232232
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
233233
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
234234
minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100,
235-
autoEncrypter: options.autoEncrypter,
236-
metadata: options.metadata
235+
autoEncrypter: options.autoEncrypter
237236
});
238237

239238
if (this.options.minPoolSize > this.options.maxPoolSize) {

src/cmap/handshake/client_metadata.ts

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { promises as fs } from 'fs';
12
import * as os from 'os';
23
import * as process from 'process';
34

4-
import { BSON, Int32 } from '../../bson';
5+
import { BSON, type Document, Int32 } from '../../bson';
56
import { MongoInvalidArgumentError } from '../../error';
67
import type { MongoOptions } from '../../mongo_client';
78

@@ -71,13 +72,13 @@ export class LimitedSizeDocument {
7172
return true;
7273
}
7374

74-
toObject(): ClientMetadata {
75+
toObject(): Document {
7576
return BSON.deserialize(BSON.serialize(this.document), {
7677
promoteLongs: false,
7778
promoteBuffers: false,
7879
promoteValues: false,
7980
useBigInt64: false
80-
}) as ClientMetadata;
81+
});
8182
}
8283
}
8384

@@ -152,8 +153,57 @@ export function makeClientMetadata(options: MakeClientMetadataOptions): ClientMe
152153
}
153154
}
154155
}
156+
return metadataDocument.toObject() as ClientMetadata;
157+
}
158+
159+
let dockerPromise: Promise<boolean>;
160+
/** @internal */
161+
async function getContainerMetadata() {
162+
const containerMetadata: Record<string, any> = {};
163+
dockerPromise ??= fs.access('/.dockerenv').then(
164+
() => true,
165+
() => false
166+
);
167+
const isDocker = await dockerPromise;
168+
169+
const { KUBERNETES_SERVICE_HOST = '' } = process.env;
170+
const isKubernetes = KUBERNETES_SERVICE_HOST.length > 0 ? true : false;
171+
172+
if (isDocker) containerMetadata.runtime = 'docker';
173+
if (isKubernetes) containerMetadata.orchestrator = 'kubernetes';
174+
175+
return containerMetadata;
176+
}
177+
178+
/**
179+
* @internal
180+
* Re-add each metadata value.
181+
* Attempt to add new env container metadata, but keep old data if it does not fit.
182+
*/
183+
export async function addContainerMetadata(originalMetadata: ClientMetadata) {
184+
const containerMetadata = await getContainerMetadata();
185+
if (Object.keys(containerMetadata).length === 0) return originalMetadata;
186+
187+
const extendedMetadata = new LimitedSizeDocument(512);
188+
189+
const extendedEnvMetadata = { ...originalMetadata?.env, container: containerMetadata };
190+
191+
for (const [key, val] of Object.entries(originalMetadata)) {
192+
if (key !== 'env') {
193+
extendedMetadata.ifItFitsItSits(key, val);
194+
} else {
195+
if (!extendedMetadata.ifItFitsItSits('env', extendedEnvMetadata)) {
196+
// add in old data if newer / extended metadata does not fit
197+
extendedMetadata.ifItFitsItSits('env', val);
198+
}
199+
}
200+
}
201+
202+
if (!('env' in originalMetadata)) {
203+
extendedMetadata.ifItFitsItSits('env', extendedEnvMetadata);
204+
}
155205

156-
return metadataDocument.toObject();
206+
return extendedMetadata.toObject();
157207
}
158208

159209
/**

src/cmap/wire_protocol/on_data.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@ type PendingPromises = Omit<
1616
* https://nodejs.org/api/events.html#eventsonemitter-eventname-options
1717
*
1818
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
19-
* It will reject upon an error event or if the provided signal is aborted.
19+
* It will reject upon an error event.
2020
*/
21-
export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
22-
const signal = options.signal;
23-
21+
export function onData(emitter: EventEmitter) {
2422
// Setup pending events and pending promise lists
2523
/**
2624
* When the caller has not yet called .next(), we store the
@@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
8987
emitter.on('data', eventHandler);
9088
emitter.on('error', errorHandler);
9189

92-
if (signal.aborted) {
93-
// If the signal is aborted, set up the first .next() call to be a rejection
94-
queueMicrotask(abortListener);
95-
} else {
96-
signal.addEventListener('abort', abortListener, { once: true });
97-
}
98-
9990
return iterator;
10091

101-
function abortListener() {
102-
errorHandler(signal.reason);
103-
}
104-
10592
function eventHandler(value: Buffer) {
10693
const promise = unconsumedPromises.shift();
10794
if (promise != null) promise.resolve({ value, done: false });
@@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
119106
// Adding event handlers
120107
emitter.off('data', eventHandler);
121108
emitter.off('error', errorHandler);
122-
signal.removeEventListener('abort', abortListener);
123109
finished = true;
124110
const doneResult = { value: undefined, done: finished } as const;
125111

src/connection_string.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { URLSearchParams } from 'url';
55
import type { Document } from './bson';
66
import { MongoCredentials } from './cmap/auth/mongo_credentials';
77
import { AUTH_MECHS_AUTH_SRC_EXTERNAL, AuthMechanism } from './cmap/auth/providers';
8-
import { makeClientMetadata } from './cmap/handshake/client_metadata';
8+
import { addContainerMetadata, makeClientMetadata } from './cmap/handshake/client_metadata';
99
import { Compressor, type CompressorName } from './cmap/wire_protocol/compression';
1010
import { Encrypter } from './encrypter';
1111
import {
@@ -552,6 +552,10 @@ export function parseOptions(
552552

553553
mongoOptions.metadata = makeClientMetadata(mongoOptions);
554554

555+
mongoOptions.extendedMetadata = addContainerMetadata(mongoOptions.metadata).catch(() => {
556+
/* rejections will be handled later */
557+
});
558+
555559
return mongoOptions;
556560
}
557561

src/mongo_client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,8 @@ export interface MongoOptions
820820
dbName: string;
821821
metadata: ClientMetadata;
822822
/** @internal */
823+
extendedMetadata: Promise<Document>;
824+
/** @internal */
823825
autoEncrypter?: AutoEncrypter;
824826
proxyHost?: string;
825827
proxyPort?: number;

src/sdam/topology.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
158158
directConnection: boolean;
159159
loadBalanced: boolean;
160160
metadata: ClientMetadata;
161+
extendedMetadata: Promise<Document>;
161162
serverMonitoringMode: ServerMonitoringMode;
162163
/** MongoDB server API version */
163164
serverApi?: ServerApi;

0 commit comments

Comments
 (0)