1
1
import { ReadPreference } from '../read_preference' ;
2
- import { MongoError , isRetryableError , AnyError } from '../error' ;
2
+ import { MongoError , isRetryableError } from '../error' ;
3
3
import { Aspect , AbstractOperation } from './operation' ;
4
4
import { maxWireVersion , maybePromise , Callback } from '../utils' ;
5
5
import { ServerType } from '../sdam/common' ;
@@ -76,31 +76,28 @@ export function executeOperation<
76
76
77
77
// The driver sessions spec mandates that we implicitly create sessions for operations
78
78
// that are not explicitly provided with a session.
79
- let session : ClientSession ;
79
+ let session = operation . session ;
80
80
let owner : symbol ;
81
81
if ( topology . hasSessionSupport ( ) ) {
82
- if ( operation . session == null ) {
82
+ if ( session == null ) {
83
83
owner = Symbol ( ) ;
84
84
session = topology . startSession ( { owner, explicit : false } ) ;
85
- operation . session = session ;
86
85
} else if ( operation . session . hasEnded ) {
87
86
throw new MongoError ( 'Use of expired sessions is not permitted' ) ;
88
87
}
89
88
}
90
89
91
90
return maybePromise ( callback , cb => {
92
- function executeCallback ( err ?: AnyError , result ?: TResult ) {
93
- if ( session && session . owner === owner ) {
94
- return session . endSession ( err2 => cb ( err2 || err , result ) ) ;
95
- }
96
-
97
- cb ( err , result ) ;
98
- }
99
-
100
91
try {
101
- executeWithServerSelection ( topology , operation , executeCallback ) ;
92
+ executeWithServerSelection ( topology , session , operation , ( err , result ) => {
93
+ if ( session && session . owner && session . owner === owner ) {
94
+ return session . endSession ( err2 => cb ( err2 || err , result ) ) ;
95
+ }
96
+
97
+ cb ( err , result ) ;
98
+ } ) ;
102
99
} catch ( e ) {
103
- if ( session && session . owner === owner ) {
100
+ if ( session && session . owner && session . owner === owner ) {
104
101
session . endSession ( ) ;
105
102
}
106
103
@@ -113,8 +110,12 @@ function supportsRetryableReads(server: Server) {
113
110
return maxWireVersion ( server ) >= 6 ;
114
111
}
115
112
116
- function executeWithServerSelection ( topology : Topology , operation : any , callback : Callback ) {
117
- const session = operation . session ;
113
+ function executeWithServerSelection (
114
+ topology : Topology ,
115
+ session : ClientSession ,
116
+ operation : any ,
117
+ callback : Callback
118
+ ) {
118
119
const readPreference = operation . readPreference || ReadPreference . primary ;
119
120
const inTransaction = session && session . inTransaction ( ) ;
120
121
@@ -196,17 +197,15 @@ function executeWithServerSelection(topology: Topology, operation: any, callback
196
197
return ;
197
198
}
198
199
199
- if ( operation . hasAspect ( Aspect . RETRYABLE ) ) {
200
+ if ( session && operation . hasAspect ( Aspect . RETRYABLE ) ) {
200
201
const willRetryRead =
201
202
topology . s . options . retryReads !== false &&
202
- operation . session &&
203
203
! inTransaction &&
204
204
supportsRetryableReads ( server ) &&
205
205
operation . canRetryRead ;
206
206
207
207
const willRetryWrite =
208
208
topology . s . options . retryWrites === true &&
209
- operation . session &&
210
209
! inTransaction &&
211
210
supportsRetryableWrites ( server ) &&
212
211
operation . canRetryWrite ;
@@ -217,7 +216,7 @@ function executeWithServerSelection(topology: Topology, operation: any, callback
217
216
if ( ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ) {
218
217
if ( hasWriteAspect && willRetryWrite ) {
219
218
operation . options . willRetryWrite = true ;
220
- operation . session . incrementTransactionNumber ( ) ;
219
+ session . incrementTransactionNumber ( ) ;
221
220
}
222
221
223
222
operation . execute ( server , session , callbackWithRetry ) ;
0 commit comments