@@ -595,6 +595,8 @@ func TestPool(t *testing.T) {
595
595
err = p .disconnect (context .Background ())
596
596
noerr (t , err )
597
597
})
598
+ // Test that if a checkOut() times out, it returns a WaitQueueTimeout error that wraps a
599
+ // context.DeadlineExceeded error.
598
600
t .Run ("wait queue timeout error" , func (t * testing.T ) {
599
601
t .Parallel ()
600
602
@@ -623,19 +625,72 @@ func TestPool(t *testing.T) {
623
625
assert .NotNilf (t , err , "expected a WaitQueueTimeout error" )
624
626
625
627
// Assert that error received is WaitQueueTimeoutError with context deadline exceeded.
626
- assert .IsType (t , WaitQueueTimeoutError {}, err , "expected a WaitQueueTimeoutError" )
628
+ assert .IsTypef (t , WaitQueueTimeoutError {}, err , "expected a WaitQueueTimeoutError" )
627
629
if err , ok := err .(WaitQueueTimeoutError ); ok {
628
630
assert .Equalf (t , context .DeadlineExceeded , err .Unwrap (), "expected wrapped error to be a context.Timeout" )
629
631
}
630
632
631
633
err = p .disconnect (context .Background ())
632
634
noerr (t , err )
633
635
})
636
+ // Test that an indefinitely blocked checkOut() doesn't cause the wait queue to overflow
637
+ // if there are many other checkOut() calls that time out. This tests a scenario where a
638
+ // wantConnQueue may grow unbounded while a checkOut() is blocked, even if all subsequent
639
+ // checkOut() calls time out (due to the behavior of wantConnQueue.cleanFront()).
640
+ t .Run ("wait queue doesn't overflow" , func (t * testing.T ) {
641
+ t .Parallel ()
642
+
643
+ cleanup := make (chan struct {})
644
+ defer close (cleanup )
645
+ addr := bootstrapConnections (t , 1 , func (nc net.Conn ) {
646
+ <- cleanup
647
+ _ = nc .Close ()
648
+ })
649
+
650
+ p := newPool (poolConfig {
651
+ Address : address .Address (addr .String ()),
652
+ MaxPoolSize : 1 ,
653
+ })
654
+ err := p .connect ()
655
+ noerr (t , err )
656
+
657
+ // Check out the 1 connection that the pool will create.
658
+ c , err := p .checkOut (context .Background ())
659
+ noerr (t , err )
660
+
661
+ // Start a goroutine that tries to check out another connection with no timeout. Expect
662
+ // this goroutine to block (wait in the wait queue) until the checked-out connection is
663
+ // checked-in. Assert that there is no error once checkOut() finally does return.
664
+ var wg sync.WaitGroup
665
+ wg .Add (1 )
666
+ go func () {
667
+ defer wg .Done ()
668
+
669
+ _ , err := p .checkOut (context .Background ())
670
+ noerr (t , err )
671
+ }()
672
+
673
+ // Run lots of check-out attempts with a low timeout and assert that each one fails with
674
+ // a WaitQueueTimeout error. Expect no other errors or panics.
675
+ for i := 0 ; i < 50000 ; i ++ {
676
+ ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Microsecond )
677
+ _ , err := p .checkOut (ctx )
678
+ cancel ()
679
+ assert .NotNilf (t , err , "expected a WaitQueueTimeout error" )
680
+ assert .IsTypef (t , WaitQueueTimeoutError {}, err , "expected a WaitQueueTimeoutError" )
681
+ }
682
+
683
+ // Check-in the connection we checked out earlier and wait for the checkOut() goroutine
684
+ // to resume.
685
+ err = p .checkIn (c )
686
+ noerr (t , err )
687
+ wg .Wait ()
688
+ })
634
689
})
635
690
t .Run ("checkIn" , func (t * testing.T ) {
636
691
t .Parallel ()
637
692
638
- t .Run ("does not return to pool twice" , func (t * testing.T ) {
693
+ t .Run ("cannot return same connection to pool twice" , func (t * testing.T ) {
639
694
t .Parallel ()
640
695
641
696
cleanup := make (chan struct {})
0 commit comments