@@ -217,7 +217,7 @@ func (s *Server) Disconnect(ctx context.Context) error {
217
217
// been cancelled.
218
218
s .globalCtxCancel ()
219
219
s .cancelCheck ()
220
- s .done <- struct {}{}
220
+ close ( s .done )
221
221
222
222
s .rttMonitor .disconnect ()
223
223
err := s .pool .disconnect (ctx )
@@ -427,40 +427,33 @@ func (s *Server) update() {
427
427
}
428
428
}
429
429
430
- var disconnectedWhileWaiting bool
431
430
waitUntilNextCheck := func () {
432
431
// Wait until heartbeatFrequency elapses, an application operation requests an immediate check, or the server
433
432
// is disconnecting.
434
433
select {
435
434
case <- heartbeatTicker .C :
436
435
case <- checkNow :
437
436
case <- done :
438
- disconnectedWhileWaiting = true
437
+ // Return because the next update iteration will check the done channel again and clean up.
439
438
return
440
439
}
441
440
442
441
// Ensure we only return if minHeartbeatFrequency has elapsed or the server is disconnecting.
443
442
select {
444
443
case <- rateLimiter .C :
445
444
case <- done :
446
- disconnectedWhileWaiting = true
447
445
return
448
446
}
449
447
}
450
448
451
449
for {
452
- // Check if the server is disconnecting. If the disconnect happened between the next check and now, we'll read
453
- // a value from the done channel. If it happened during waitUntilNextCheck, the value has already been read
454
- // from the channel, so we check the disconnectedWhileWaiting flag.
455
- var disconnecting bool
450
+ // Check if the server is disconnecting. Even if waitForNextCheck has already read from the done channel, we
451
+ // can safely read from it again because Disconnect closes the channel.
456
452
select {
457
453
case <- done :
458
- disconnecting = true
459
- default :
460
- }
461
- if disconnecting || disconnectedWhileWaiting {
462
454
closeServer ()
463
455
return
456
+ default :
464
457
}
465
458
466
459
previousDescription := s .Description ()
@@ -479,6 +472,10 @@ func (s *Server) update() {
479
472
}
480
473
481
474
s .updateDescription (desc )
475
+ if desc .LastError != nil {
476
+ // Clear the pool once the description has been updated to Unknown.
477
+ s .pool .clear ()
478
+ }
482
479
483
480
// If the server supports streaming or we're already streaming, we want to move to streaming the next response
484
481
// without waiting. If the server has transitioned to Unknown from a network error, we want to do another
@@ -674,9 +671,9 @@ func (s *Server) check() (description.Server, error) {
674
671
return emptyDescription , errCheckCancelled
675
672
}
676
673
677
- // An error occurred. We clear the pool for all errors and return an Unknown description.
674
+ // An error occurred. We reset the RTT monitor for all errors and return an Unknown description. The pool must
675
+ // also be cleared, but only after the description has already been updated, so that is handled by the caller.
678
676
s .rttMonitor .reset ()
679
- s .pool .clear ()
680
677
return description .NewServerFromError (s .address , err ), nil
681
678
}
682
679
0 commit comments