@@ -23,6 +23,7 @@ import { HostNameResolver } from '../channel'
23
23
import SingleConnectionProvider from './connection-provider-single'
24
24
import PooledConnectionProvider from './connection-provider-pooled'
25
25
import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
26
+ import { controller } from '../lang'
26
27
import {
27
28
createChannelConnection ,
28
29
ConnectionErrorHandler ,
@@ -75,6 +76,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
75
76
)
76
77
} )
77
78
79
+ this . _updateRoutingTableTimeoutConfig = {
80
+ timeout : this . _config . updateRoutingTableTimeout ,
81
+ reason : ( ) => newError (
82
+ `Routing table update timed out in ${ this . _config . updateRoutingTableTimeout } ms.`
83
+ )
84
+ }
85
+
78
86
this . _routingContext = { ...routingContext , address : address . toString ( ) }
79
87
this . _seedRouter = address
80
88
this . _rediscovery = new Rediscovery ( this . _routingContext )
@@ -143,53 +151,66 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
143
151
this . _handleAuthorizationExpired ( error , address , context . database )
144
152
)
145
153
146
- const routingTable = await this . _freshRoutingTable ( {
147
- accessMode,
148
- database : context . database ,
149
- bookmarks : bookmarks ,
150
- impersonatedUser,
151
- onDatabaseNameResolved : ( databaseName ) => {
152
- context . database = context . database || databaseName
153
- if ( onDatabaseNameResolved ) {
154
- onDatabaseNameResolved ( databaseName )
154
+ const refreshRoutingTableJob = {
155
+ run : async ( _ , cancelationToken ) => {
156
+ const routingTable = await this . _freshRoutingTable ( {
157
+ accessMode,
158
+ database : context . database ,
159
+ bookmarks : bookmarks ,
160
+ impersonatedUser,
161
+ onDatabaseNameResolved : ( databaseName ) => {
162
+ context . database = context . database || databaseName
163
+ if ( onDatabaseNameResolved ) {
164
+ onDatabaseNameResolved ( databaseName )
165
+ }
166
+ } ,
167
+ cancelationToken
168
+ } )
169
+
170
+ // select a target server based on specified access mode
171
+ if ( accessMode === READ ) {
172
+ address = this . _loadBalancingStrategy . selectReader ( routingTable . readers )
173
+ name = 'read'
174
+ } else if ( accessMode === WRITE ) {
175
+ address = this . _loadBalancingStrategy . selectWriter ( routingTable . writers )
176
+ name = 'write'
177
+ } else {
178
+ throw newError ( 'Illegal mode ' + accessMode )
155
179
}
156
- }
157
- } )
158
180
159
- // select a target server based on specified access mode
160
- if ( accessMode === READ ) {
161
- address = this . _loadBalancingStrategy . selectReader ( routingTable . readers )
162
- name = 'read'
163
- } else if ( accessMode === WRITE ) {
164
- address = this . _loadBalancingStrategy . selectWriter ( routingTable . writers )
165
- name = 'write'
166
- } else {
167
- throw newError ( 'Illegal mode ' + accessMode )
168
- }
169
-
170
- // we couldn't select a target server
171
- if ( ! address ) {
172
- throw newError (
173
- `Failed to obtain connection towards ${ name } server. Known routing table is: ${ routingTable } ` ,
174
- SESSION_EXPIRED
175
- )
181
+ // we couldn't select a target server
182
+ if ( ! address ) {
183
+ throw newError (
184
+ `Failed to obtain connection towards ${ name } server. Known routing table is: ${ routingTable } ` ,
185
+ SESSION_EXPIRED
186
+ )
187
+ }
188
+ return { routingTable, address }
189
+ }
176
190
}
177
191
178
- try {
179
- const connection = await this . _acquireConnectionToServer (
180
- address ,
181
- name ,
182
- routingTable
183
- )
192
+ const acquireConnectionJob = {
193
+ run : async ( { routingTable, address } ) => {
194
+ try {
195
+ const connection = await this . _acquireConnectionToServer (
196
+ address ,
197
+ name ,
198
+ routingTable
199
+ )
184
200
185
- return new DelegateConnection ( connection , databaseSpecificErrorHandler )
186
- } catch ( error ) {
187
- const transformed = databaseSpecificErrorHandler . handleAndTransformError (
188
- error ,
189
- address
190
- )
191
- throw transformed
201
+ return new DelegateConnection ( connection , databaseSpecificErrorHandler )
202
+ } catch ( error ) {
203
+ const transformed = databaseSpecificErrorHandler . handleAndTransformError (
204
+ error ,
205
+ address
206
+ )
207
+ throw transformed
208
+ }
209
+ } ,
210
+ onTimeout : connection => connection . _release ( )
192
211
}
212
+
213
+ return controller . runWithTimeout ( this . _sessionConnectionTimeoutConfig , refreshRoutingTableJob , acquireConnectionJob )
193
214
}
194
215
195
216
async _hasProtocolVersion ( versionPredicate ) {
@@ -301,22 +322,28 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
301
322
return this . _connectionPool . acquire ( address )
302
323
}
303
324
304
- _freshRoutingTable ( { accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = { } ) {
305
- const currentRoutingTable = this . _routingTableRegistry . get (
306
- database ,
307
- ( ) => new RoutingTable ( { database } )
308
- )
325
+ _freshRoutingTable ( { accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller . CancelationToken ( ( ) => false ) } = { } ) {
326
+ const refreshRoutingTableJob = {
327
+ run : ( _ , refreshCancelationToken ) => {
328
+ const combinedCancelationToken = refreshCancelationToken . combine ( cancelationToken )
329
+ const currentRoutingTable = this . _routingTableRegistry . get (
330
+ database ,
331
+ ( ) => new RoutingTable ( { database } )
332
+ )
309
333
310
- if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
311
- return currentRoutingTable
334
+ if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
335
+ return currentRoutingTable
336
+ }
337
+ this . _log . info (
338
+ `Routing table is stale for database: "${ database } " and access mode: "${ accessMode } ": ${ currentRoutingTable } `
339
+ )
340
+ return this . _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved , combinedCancelationToken )
341
+ }
312
342
}
313
- this . _log . info (
314
- `Routing table is stale for database: "${ database } " and access mode: "${ accessMode } ": ${ currentRoutingTable } `
315
- )
316
- return this . _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved )
343
+ return controller . runWithTimeout ( this . _updateRoutingTableTimeoutConfig , refreshRoutingTableJob )
317
344
}
318
345
319
- _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved ) {
346
+ _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved , cancelationToken ) {
320
347
const knownRouters = currentRoutingTable . routers
321
348
322
349
if ( this . _useSeedRouter ) {
@@ -325,15 +352,17 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
325
352
currentRoutingTable ,
326
353
bookmarks ,
327
354
impersonatedUser ,
328
- onDatabaseNameResolved
355
+ onDatabaseNameResolved ,
356
+ cancelationToken
329
357
)
330
358
}
331
359
return this . _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter (
332
360
knownRouters ,
333
361
currentRoutingTable ,
334
362
bookmarks ,
335
363
impersonatedUser ,
336
- onDatabaseNameResolved
364
+ onDatabaseNameResolved ,
365
+ cancelationToken
337
366
)
338
367
}
339
368
@@ -342,7 +371,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
342
371
currentRoutingTable ,
343
372
bookmarks ,
344
373
impersonatedUser ,
345
- onDatabaseNameResolved
374
+ onDatabaseNameResolved ,
375
+ cancelationToken
346
376
) {
347
377
// we start with seed router, no routers were probed before
348
378
const seenRouters = [ ]
@@ -351,7 +381,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
351
381
this . _seedRouter ,
352
382
currentRoutingTable ,
353
383
bookmarks ,
354
- impersonatedUser
384
+ impersonatedUser ,
385
+ cancelationToken
355
386
)
356
387
357
388
if ( newRoutingTable ) {
@@ -362,7 +393,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
362
393
knownRouters ,
363
394
currentRoutingTable ,
364
395
bookmarks ,
365
- impersonatedUser
396
+ impersonatedUser ,
397
+ cancelationToken
366
398
)
367
399
newRoutingTable = newRoutingTable2
368
400
error = error2 || error
@@ -381,13 +413,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
381
413
currentRoutingTable ,
382
414
bookmarks ,
383
415
impersonatedUser ,
384
- onDatabaseNameResolved
416
+ onDatabaseNameResolved ,
417
+ cancelationToken
385
418
) {
386
419
let [ newRoutingTable , error ] = await this . _fetchRoutingTableUsingKnownRouters (
387
420
knownRouters ,
388
421
currentRoutingTable ,
389
422
bookmarks ,
390
- impersonatedUser
423
+ impersonatedUser ,
424
+ cancelationToken
391
425
)
392
426
393
427
if ( ! newRoutingTable ) {
@@ -397,7 +431,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
397
431
this . _seedRouter ,
398
432
currentRoutingTable ,
399
433
bookmarks ,
400
- impersonatedUser
434
+ impersonatedUser ,
435
+ cancelationToken
401
436
)
402
437
}
403
438
@@ -413,13 +448,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
413
448
knownRouters ,
414
449
currentRoutingTable ,
415
450
bookmarks ,
416
- impersonatedUser
451
+ impersonatedUser ,
452
+ cancelationToken
417
453
) {
418
454
const [ newRoutingTable , error ] = await this . _fetchRoutingTable (
419
455
knownRouters ,
420
456
currentRoutingTable ,
421
457
bookmarks ,
422
- impersonatedUser
458
+ impersonatedUser ,
459
+ cancelationToken
423
460
)
424
461
425
462
if ( newRoutingTable ) {
@@ -444,16 +481,19 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
444
481
seedRouter ,
445
482
routingTable ,
446
483
bookmarks ,
447
- impersonatedUser
484
+ impersonatedUser ,
485
+ cancelationToken
448
486
) {
449
487
const resolvedAddresses = await this . _resolveSeedRouter ( seedRouter )
450
488
489
+ cancelationToken . throwIfCancellationRequested ( )
490
+
451
491
// filter out all addresses that we've already tried
452
492
const newAddresses = resolvedAddresses . filter (
453
493
address => seenRouters . indexOf ( address ) < 0
454
494
)
455
495
456
- return await this . _fetchRoutingTable ( newAddresses , routingTable , bookmarks , impersonatedUser )
496
+ return await this . _fetchRoutingTable ( newAddresses , routingTable , bookmarks , impersonatedUser , cancelationToken )
457
497
}
458
498
459
499
async _resolveSeedRouter ( seedRouter ) {
@@ -465,7 +505,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
465
505
return [ ] . concat . apply ( [ ] , dnsResolvedAddresses )
466
506
}
467
507
468
- async _fetchRoutingTable ( routerAddresses , routingTable , bookmarks , impersonatedUser ) {
508
+ async _fetchRoutingTable ( routerAddresses , routingTable , bookmarks , impersonatedUser , cancelationToken ) {
469
509
return routerAddresses . reduce (
470
510
async ( refreshedTablePromise , currentRouter , currentIndex ) => {
471
511
const [ newRoutingTable ] = await refreshedTablePromise
@@ -499,11 +539,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
499
539
impersonatedUser
500
540
) , null ]
501
541
} catch ( error ) {
542
+ cancelationToken . throwIfCancellationRequested ( )
502
543
return this . _handleRediscoveryError ( error , currentRouter )
503
544
} finally {
504
- session . close ( )
545
+ await session . close ( )
505
546
}
506
547
} else {
548
+ cancelationToken . throwIfCancellationRequested ( )
507
549
// unable to acquire connection and create session towards the current router
508
550
// return null to signal that the next router should be tried
509
551
return [ null , error ]
0 commit comments