1
1
import { type Readable , Transform , type TransformCallback } from 'stream' ;
2
- import { setTimeout } from 'timers' ;
2
+ import { clearTimeout , setTimeout } from 'timers' ;
3
3
import { promisify } from 'util' ;
4
4
5
5
import type { BSONSerializeOptions , Document , ObjectId } from '../bson' ;
@@ -62,19 +62,6 @@ import { type CompressorName, decompressResponse } from './wire_protocol/compres
62
62
import { onData } from './wire_protocol/on_data' ;
63
63
import { getReadPreference , isSharded } from './wire_protocol/shared' ;
64
64
65
- /** @internal */
66
- const kGeneration = Symbol ( 'generation' ) ;
67
- /** @internal */
68
- const kLastUseTime = Symbol ( 'lastUseTime' ) ;
69
- /** @internal */
70
- const kClusterTime = Symbol ( 'clusterTime' ) ;
71
- /** @internal */
72
- const kDescription = Symbol ( 'description' ) ;
73
- /** @internal */
74
- const kHello = Symbol ( 'hello' ) ;
75
- /** @internal */
76
- const kAutoEncrypter = Symbol ( 'autoEncrypter' ) ;
77
-
78
65
/** @internal */
79
66
export interface CommandOptions extends BSONSerializeOptions {
80
67
secondaryOk ?: boolean ;
@@ -154,15 +141,6 @@ export function hasSessionSupport(conn: Connection): boolean {
154
141
return description . logicalSessionTimeoutMinutes != null ;
155
142
}
156
143
157
- function supportsOpMsg ( conn : Connection ) {
158
- const description = conn . description ;
159
- if ( description == null ) {
160
- return false ;
161
- }
162
-
163
- return maxWireVersion ( conn ) >= 6 && ! description . __nodejs_mock_server__ ;
164
- }
165
-
166
144
function streamIdentifier ( stream : Stream , options : ConnectionOptions ) : string {
167
145
if ( options . proxyHost ) {
168
146
// If proxy options are specified, the properties of `stream` itself
@@ -178,37 +156,26 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
178
156
return uuidV4 ( ) . toString ( 'hex' ) ;
179
157
}
180
158
181
- /** in-progress connection layer */
182
-
183
159
/** @internal */
184
160
export class Connection extends TypedEventEmitter < ConnectionEvents > {
185
- id : number | '<monitor>' ;
186
- address : string ;
187
- socketTimeoutMS : number ;
188
- monitorCommands : boolean ;
189
- lastHelloMS ?: number ;
190
- serverApi ?: ServerApi ;
191
- helloOk ?: boolean ;
192
- /** @internal */
193
- authContext ?: AuthContext ;
194
-
195
- delayedTimeoutId : NodeJS . Timeout | null = null ;
196
- /** @internal */
197
- [ kDescription ] : StreamDescription ;
198
- /** @internal */
199
- [ kGeneration ] : number ;
200
- /** @internal */
201
- [ kLastUseTime ] : number ;
202
-
161
+ public id : number | '<monitor>' ;
162
+ public address : string ;
163
+ public lastHelloMS ?: number ;
164
+ public serverApi ?: ServerApi ;
165
+ public helloOk ?: boolean ;
166
+ public authContext ?: AuthContext ;
167
+ public delayedTimeoutId : NodeJS . Timeout | null = null ;
168
+ public generation : number ;
169
+ public readonly description : Readonly < StreamDescription > ;
170
+
171
+ private lastUseTime : number ;
172
+ private socketTimeoutMS : number ;
173
+ private monitorCommands : boolean ;
203
174
private socket : Stream ;
204
175
private controller : AbortController ;
205
176
private messageStream : Readable ;
206
177
private socketWrite : ( buffer : Uint8Array ) => Promise < void > ;
207
-
208
- /** @internal */
209
- [ kHello ] : Document | null ;
210
- /** @internal */
211
- [ kClusterTime ] : Document | null ;
178
+ private clusterTime : Document | null = null ;
212
179
213
180
/** @event */
214
181
static readonly COMMAND_STARTED = COMMAND_STARTED ;
@@ -235,12 +202,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
235
202
this . socketTimeoutMS = options . socketTimeoutMS ?? 0 ;
236
203
this . monitorCommands = options . monitorCommands ;
237
204
this . serverApi = options . serverApi ;
238
- this [ kHello ] = null ;
239
- this [ kClusterTime ] = null ;
240
205
241
- this [ kDescription ] = new StreamDescription ( this . address , options ) ;
242
- this [ kGeneration ] = options . generation ;
243
- this [ kLastUseTime ] = now ( ) ;
206
+ this . description = new StreamDescription ( this . address , options ) ;
207
+ this . generation = options . generation ;
208
+ this . lastUseTime = now ( ) ;
244
209
245
210
this . socket = stream ;
246
211
this . controller = new AbortController ( ) ;
@@ -259,89 +224,62 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
259
224
}
260
225
261
226
/** Indicates that the connection (including underlying TCP socket) has been closed. */
262
- get closed ( ) : boolean {
227
+ public get closed ( ) : boolean {
263
228
return this . controller . signal . aborted ;
264
229
}
265
230
266
- get description ( ) : StreamDescription {
267
- return this [ kDescription ] ;
268
- }
269
-
270
- get hello ( ) : Document | null {
271
- return this [ kHello ] ;
272
- }
273
-
274
231
// the `connect` method stores the result of the handshake hello on the connection
275
- set hello ( response : Document | null ) {
276
- this [ kDescription ] . receiveResponse ( response ) ;
277
- this [ kDescription ] = Object . freeze ( this [ kDescription ] ) ;
278
-
279
- // TODO: remove this, and only use the `StreamDescription` in the future
280
- this [ kHello ] = response ;
232
+ public set hello ( response : Document | null ) {
233
+ this . description . receiveResponse ( response ) ;
234
+ Object . freeze ( this . description ) ;
281
235
}
282
236
283
- get serviceId ( ) : ObjectId | undefined {
237
+ public get serviceId ( ) : ObjectId | undefined {
284
238
return this . hello ?. serviceId ;
285
239
}
286
240
287
- get loadBalanced ( ) : boolean {
241
+ public get loadBalanced ( ) : boolean {
288
242
return this . description . loadBalanced ;
289
243
}
290
244
291
- get generation ( ) : number {
292
- return this [ kGeneration ] || 0 ;
293
- }
294
-
295
- set generation ( generation : number ) {
296
- this [ kGeneration ] = generation ;
297
- }
298
-
299
- get idleTime ( ) : number {
300
- return calculateDurationInMs ( this [ kLastUseTime ] ) ;
301
- }
302
-
303
- get clusterTime ( ) : Document | null {
304
- return this [ kClusterTime ] ;
245
+ public get idleTime ( ) : number {
246
+ return calculateDurationInMs ( this . lastUseTime ) ;
305
247
}
306
248
307
- get stream ( ) : Stream {
308
- return this . socket ;
309
- }
310
-
311
- get hasSessionSupport ( ) : boolean {
249
+ private get hasSessionSupport ( ) : boolean {
312
250
return this . description . logicalSessionTimeoutMinutes != null ;
313
251
}
314
252
315
- get supportsOpMsg ( ) : boolean {
253
+ private get supportsOpMsg ( ) : boolean {
316
254
return (
317
255
this . description != null &&
318
- maxWireVersion ( this as any as Connection ) >= 6 &&
256
+ maxWireVersion ( this ) >= 6 &&
319
257
! this . description . __nodejs_mock_server__
320
258
) ;
321
259
}
322
260
323
- markAvailable ( ) : void {
324
- this [ kLastUseTime ] = now ( ) ;
261
+ public markAvailable ( ) : void {
262
+ this . lastUseTime = now ( ) ;
325
263
}
326
264
327
- onError ( error ?: Error ) {
265
+ public onError ( error ?: Error ) {
328
266
this . cleanup ( error ) ;
329
267
}
330
268
331
- onClose ( ) {
269
+ private onClose ( ) {
332
270
const message = `connection ${ this . id } to ${ this . address } closed` ;
333
271
this . cleanup ( new MongoNetworkError ( message ) ) ;
334
272
}
335
273
336
- onTimeout ( ) {
274
+ private onTimeout ( ) {
337
275
this . delayedTimeoutId = setTimeout ( ( ) => {
338
276
const message = `connection ${ this . id } to ${ this . address } timed out` ;
339
277
const beforeHandshake = this . hello == null ;
340
278
this . cleanup ( new MongoNetworkTimeoutError ( message , { beforeHandshake } ) ) ;
341
279
} , 1 ) . unref ( ) ; // No need for this timer to hold the event loop open
342
280
}
343
281
344
- destroy ( options : DestroyOptions , callback ?: Callback ) : void {
282
+ public destroy ( options : DestroyOptions , callback ?: Callback ) : void {
345
283
if ( this . closed ) {
346
284
if ( typeof callback === 'function' ) process . nextTick ( callback ) ;
347
285
return ;
@@ -476,7 +414,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
476
414
}
477
415
478
416
if ( document . $clusterTime ) {
479
- this [ kClusterTime ] = document . $clusterTime ;
417
+ this . clusterTime = document . $clusterTime ;
480
418
this . emit ( Connection . CLUSTER_TIME_RECEIVED , document . $clusterTime ) ;
481
419
}
482
420
}
@@ -495,7 +433,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
495
433
}
496
434
}
497
435
498
- async * sendCommand ( ns : MongoDBNamespace , command : Document , options : CommandOptions = { } ) {
436
+ private async * sendCommand (
437
+ ns : MongoDBNamespace ,
438
+ command : Document ,
439
+ options : CommandOptions = { }
440
+ ) {
499
441
const message = this . prepareCommand ( ns . db , command , options ) ;
500
442
501
443
let started = 0 ;
@@ -555,7 +497,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
555
497
}
556
498
}
557
499
558
- async command (
500
+ public async command (
559
501
ns : MongoDBNamespace ,
560
502
command : Document ,
561
503
options : CommandOptions = { }
@@ -567,7 +509,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
567
509
throw new MongoUnexpectedServerResponseError ( 'Unable to get response from server' ) ;
568
510
}
569
511
570
- exhaustCommand (
512
+ public exhaustCommand (
571
513
ns : MongoDBNamespace ,
572
514
command : Document ,
573
515
options : CommandOptions ,
@@ -590,7 +532,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
590
532
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
591
533
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
592
534
*/
593
- async writeCommand (
535
+ private async writeCommand (
594
536
command : WriteProtocolMessageType ,
595
537
options : { agreedCompressor ?: CompressorName ; zlibCompressionLevel ?: number }
596
538
) : Promise < void > {
@@ -616,7 +558,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
616
558
*
617
559
* Note that `for-await` loops call `return` automatically when the loop is exited.
618
560
*/
619
- async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpQueryResponse > {
561
+ private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpQueryResponse > {
620
562
for await ( const message of onData ( this . messageStream , { signal : this . controller . signal } ) ) {
621
563
const response = await decompressResponse ( message ) ;
622
564
yield response ;
@@ -638,6 +580,7 @@ export class SizedMessageTransform extends Transform {
638
580
this . bufferPool = new BufferPool ( ) ;
639
581
this . connection = connection ;
640
582
}
583
+
641
584
override _transform ( chunk : Buffer , encoding : unknown , callback : TransformCallback ) : void {
642
585
if ( this . connection . delayedTimeoutId != null ) {
643
586
clearTimeout ( this . connection . delayedTimeoutId ) ;
@@ -667,11 +610,11 @@ export class SizedMessageTransform extends Transform {
667
610
/** @internal */
668
611
export class CryptoConnection extends Connection {
669
612
/** @internal */
670
- [ kAutoEncrypter ] ?: AutoEncrypter ;
613
+ autoEncrypter ?: AutoEncrypter ;
671
614
672
615
constructor ( stream : Stream , options : ConnectionOptions ) {
673
616
super ( stream , options ) ;
674
- this [ kAutoEncrypter ] = options . autoEncrypter ;
617
+ this . autoEncrypter = options . autoEncrypter ;
675
618
}
676
619
677
620
/** @internal @override */
@@ -680,7 +623,7 @@ export class CryptoConnection extends Connection {
680
623
cmd : Document ,
681
624
options : CommandOptions
682
625
) : Promise < Document > {
683
- const autoEncrypter = this [ kAutoEncrypter ] ;
626
+ const { autoEncrypter } = this ;
684
627
if ( ! autoEncrypter ) {
685
628
throw new MongoMissingDependencyError ( 'No AutoEncrypter available for encryption' ) ;
686
629
}
0 commit comments