1
- import { promisify } from 'util' ;
2
-
3
1
import type { Document } from '../bson' ;
4
2
import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter' ;
5
3
import { type CommandOptions , Connection , type DestroyOptions } from '../cmap/connection' ;
@@ -26,6 +24,7 @@ import {
26
24
isNetworkErrorBeforeHandshake ,
27
25
isNodeShuttingDownError ,
28
26
isSDAMUnrecoverableError ,
27
+ MONGODB_ERROR_CODES ,
29
28
MongoError ,
30
29
MongoErrorLabel ,
31
30
MongoInvalidArgumentError ,
@@ -48,6 +47,7 @@ import {
48
47
makeStateMachine ,
49
48
maxWireVersion ,
50
49
type MongoDBNamespace ,
50
+ promiseWithResolvers ,
51
51
supportsRetryableWrites
52
52
} from '../utils' ;
53
53
import {
@@ -115,7 +115,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
115
115
pool : ConnectionPool ;
116
116
serverApi ?: ServerApi ;
117
117
hello ?: Document ;
118
- commandAsync : ( ns : MongoDBNamespace , cmd : Document , options : CommandOptions ) => Promise < Document > ;
119
118
monitor : Monitor | null ;
120
119
121
120
/** @event */
@@ -139,16 +138,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
139
138
constructor ( topology : Topology , description : ServerDescription , options : ServerOptions ) {
140
139
super ( ) ;
141
140
142
- this . commandAsync = promisify (
143
- (
144
- ns : MongoDBNamespace ,
145
- cmd : Document ,
146
- options : CommandOptions ,
147
- // callback type defines Document result because result is never nullish when it succeeds, otherwise promise rejects
148
- callback : ( error : Error | undefined , result : Document ) => void
149
- ) => this . command ( ns , cmd , options , callback as any )
150
- ) ;
151
-
152
141
this . serverApi = options . serverApi ;
153
142
154
143
const poolOptions = { hostAddress : description . hostAddress , ...options } ;
@@ -380,6 +369,92 @@ export class Server extends TypedEventEmitter<ServerEvents> {
380
369
) ;
381
370
}
382
371
372
+ async commandAsync (
373
+ ns : MongoDBNamespace ,
374
+ cmd : Document ,
375
+ options : CommandOptions
376
+ ) : Promise < Document > {
377
+ if ( ns . db == null || typeof ns === 'string' ) {
378
+ throw new MongoInvalidArgumentError ( 'Namespace must not be a string' ) ;
379
+ }
380
+
381
+ if ( this . s . state === STATE_CLOSING || this . s . state === STATE_CLOSED ) {
382
+ throw new MongoServerClosedError ( ) ;
383
+ }
384
+
385
+ // Clone the options
386
+ const finalOptions = Object . assign ( { } , options , { wireProtocolCommand : false } ) ;
387
+
388
+ // There are cases where we need to flag the read preference not to get sent in
389
+ // the command, such as pre-5.0 servers attempting to perform an aggregate write
390
+ // with a non-primary read preference. In this case the effective read preference
391
+ // (primary) is not the same as the provided and must be removed completely.
392
+ if ( finalOptions . omitReadPreference ) {
393
+ delete finalOptions . readPreference ;
394
+ }
395
+
396
+ const session = finalOptions . session ;
397
+ let conn = session ?. pinnedConnection ;
398
+
399
+ // NOTE: This is a hack! We can't retrieve the connections used for executing an operation
400
+ // (and prevent them from being checked back in) at the point of operation execution.
401
+ // This should be considered as part of the work for NODE-2882
402
+ // NOTE:
403
+ // When incrementing operation count, it's important that we increment it before we
404
+ // attempt to check out a connection from the pool. This ensures that operations that
405
+ // are waiting for a connection are included in the operation count. Load balanced
406
+ // mode will only ever have a single server, so the operation count doesn't matter.
407
+ // Incrementing the operation count above the logic to handle load balanced mode would
408
+ // require special logic to decrement it again, or would double increment (the load
409
+ // balanced code makes a recursive call). Instead, we increment the count after this
410
+ // check.
411
+
412
+ if ( this . loadBalanced && session && conn == null && isPinnableCommand ( cmd , session ) ) {
413
+ const { promise : checkedOutPromise , resolve, reject } = promiseWithResolvers < Connection > ( ) ;
414
+
415
+ this . pool . checkOut ( ( err , conn ) => {
416
+ if ( err || conn == null ) {
417
+ reject ( err ) ;
418
+ return ;
419
+ }
420
+ resolve ( conn ) ;
421
+ } ) ;
422
+ const checkedOut = await checkedOutPromise ;
423
+
424
+ session . pin ( checkedOut ) ;
425
+ return this . commandAsync ( ns , cmd , finalOptions ) ;
426
+ }
427
+ this . incrementOperationCount ( ) ;
428
+
429
+ // FIXME: Fix this
430
+ if ( ! conn ) {
431
+ const { promise : connPromise , resolve, reject } = promiseWithResolvers < Connection > ( ) ;
432
+ this . pool . checkOut ( ( err , conn ) => {
433
+ // don't callback with `err` here, we might want to act upon it inside `fn`
434
+ if ( err || conn == null ) {
435
+ reject ( err ) ;
436
+ return ;
437
+ }
438
+
439
+ resolve ( conn ) ;
440
+ } ) ;
441
+
442
+ conn = await connPromise ;
443
+ }
444
+
445
+ try {
446
+ return await conn . command ( ns , cmd , finalOptions ) ;
447
+ } catch ( e ) {
448
+ if ( e instanceof MongoError && e . code === MONGODB_ERROR_CODES . Reauthenticate ) {
449
+ conn = await this . pool . reauthenticateAsync ( conn ) ;
450
+ } else {
451
+ throw e ;
452
+ }
453
+ }
454
+
455
+ return conn . command ( ns , cmd , finalOptions ) ;
456
+ }
457
+
383
458
/**
384
459
* Handle SDAM error
385
460
* @internal
0 commit comments