Skip to content

Commit ea4f951

Browse files
committed
Respond to PR feedback.
1 parent c195560 commit ea4f951

File tree

2 files changed

+68
-5
lines changed

2 files changed

+68
-5
lines changed

x/mongo/driver/topology/pool.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,10 +319,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
319319
}
320320
}()
321321

322-
// Get in the queue for an idle connection. If queueForIdleConn returns true, it was able to
322+
// Get in the queue for an idle connection. If getOrQueueForIdleConn returns true, it was able to
323323
// immediately deliver an idle connection to the wantConn, so we can return the connection or
324324
// error from the wantConn without waiting for "ready".
325-
if delivered := p.queueForIdleConn(w); delivered {
325+
if delivered := p.getOrQueueForIdleConn(w); delivered {
326326
if w.err != nil {
327327
if p.monitor != nil {
328328
p.monitor.Event(&event.PoolEvent{
@@ -528,7 +528,11 @@ func (p *pool) clear(serviceID *primitive.ObjectID) {
528528
p.generation.clear(serviceID)
529529
}
530530

531-
func (p *pool) queueForIdleConn(w *wantConn) bool {
531+
// getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is
532+
// an idle connection in the idle connections stack, it pops an idle connection, delivers it to the
533+
// wantConn, and returns true. If there are no idle connections in the idle connections stack, it
534+
// adds the wantConn to the idleConnWait queue and returns false.
535+
func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
532536
p.idleMu.Lock()
533537
defer p.idleMu.Unlock()
534538

@@ -836,6 +840,10 @@ func (w *wantConn) tryDeliver(conn *connection, err error) bool {
836840
// has been delivered already, cancel returns it with p.checkInNoEvent(). Note that the caller must
837841
// not hold any locks on the pool while calling cancel.
838842
func (w *wantConn) cancel(p *pool, err error) {
843+
if err == nil {
844+
panic("x/mongo/driver/topology: internal error: misuse of cancel")
845+
}
846+
839847
w.mu.Lock()
840848
if w.conn == nil && w.err == nil {
841849
close(w.ready) // catch misbehavior in future delivery

x/mongo/driver/topology/pool_test.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,8 @@ func TestPool(t *testing.T) {
595595
err = p.disconnect(context.Background())
596596
noerr(t, err)
597597
})
598+
// Test that if a checkOut() times out, it returns a WaitQueueTimeout error that wraps a
599+
// context.DeadlineExceeded error.
598600
t.Run("wait queue timeout error", func(t *testing.T) {
599601
t.Parallel()
600602

@@ -623,19 +625,72 @@ func TestPool(t *testing.T) {
623625
assert.NotNilf(t, err, "expected a WaitQueueTimeout error")
624626

625627
// Assert that error received is WaitQueueTimeoutError with context deadline exceeded.
626-
assert.IsType(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError")
628+
assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError")
627629
if err, ok := err.(WaitQueueTimeoutError); ok {
628630
assert.Equalf(t, context.DeadlineExceeded, err.Unwrap(), "expected wrapped error to be a context.Timeout")
629631
}
630632

631633
err = p.disconnect(context.Background())
632634
noerr(t, err)
633635
})
636+
// Test that an indefinitely blocked checkOut() doesn't cause the wait queue to overflow
637+
// if there are many other checkOut() calls that time out. This tests a scenario where a
638+
// wantConnQueue may grow unbounded while a checkOut() is blocked, even if all subsequent
639+
// checkOut() calls time out (due to the behavior of wantConnQueue.cleanFront()).
640+
t.Run("wait queue doesn't overflow", func(t *testing.T) {
641+
t.Parallel()
642+
643+
cleanup := make(chan struct{})
644+
defer close(cleanup)
645+
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
646+
<-cleanup
647+
_ = nc.Close()
648+
})
649+
650+
p := newPool(poolConfig{
651+
Address: address.Address(addr.String()),
652+
MaxPoolSize: 1,
653+
})
654+
err := p.connect()
655+
noerr(t, err)
656+
657+
// Check out the 1 connection that the pool will create.
658+
c, err := p.checkOut(context.Background())
659+
noerr(t, err)
660+
661+
// Start a goroutine that tries to check out another connection with no timeout. Expect
662+
// this goroutine to block (wait in the wait queue) until the checked-out connection is
663+
// checked-in. Assert that there is no error once checkOut() finally does return.
664+
var wg sync.WaitGroup
665+
wg.Add(1)
666+
go func() {
667+
defer wg.Done()
668+
669+
_, err := p.checkOut(context.Background())
670+
noerr(t, err)
671+
}()
672+
673+
// Run lots of check-out attempts with a low timeout and assert that each one fails with
674+
// a WaitQueueTimeout error. Expect no other errors or panics.
675+
for i := 0; i < 50000; i++ {
676+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
677+
_, err := p.checkOut(ctx)
678+
cancel()
679+
assert.NotNilf(t, err, "expected a WaitQueueTimeout error")
680+
assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError")
681+
}
682+
683+
// Check-in the connection we checked out earlier and wait for the checkOut() goroutine
684+
// to resume.
685+
err = p.checkIn(c)
686+
noerr(t, err)
687+
wg.Wait()
688+
})
634689
})
635690
t.Run("checkIn", func(t *testing.T) {
636691
t.Parallel()
637692

638-
t.Run("does not return to pool twice", func(t *testing.T) {
693+
t.Run("cannot return same connection to pool twice", func(t *testing.T) {
639694
t.Parallel()
640695

641696
cleanup := make(chan struct{})

0 commit comments

Comments
 (0)