Skip to content

GODRIVER-2109 Prevent a data race between connecting and checking out a connection from the resourcePool. #726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"context"
"fmt"
"os"
"reflect"
Expand Down Expand Up @@ -457,6 +458,13 @@ func TestClient(t *testing.T) {
mode := modeVal.StringValue()
assert.Equal(mt, mode, "primaryPreferred", "expected read preference mode primaryPreferred, got %v", mode)
})

// Test that using a client with minPoolSize set doesn't cause a data race.
mtOpts = mtest.NewOptions().ClientOptions(options.Client().SetMinPoolSize(5))
mt.RunOpts("minPoolSize", mtOpts, func(mt *mtest.T) {
err := mt.Client.Ping(context.Background(), readpref.Primary())
assert.Nil(t, err, "unexpected error calling Ping: %v", err)
})
}

type proxyMessage struct {
Expand Down
2 changes: 1 addition & 1 deletion x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (p *pool) connectionInitFunc() interface{} {
return nil
}

go c.connect(context.Background())
c.connect(context.Background())

return c
}
Expand Down
31 changes: 25 additions & 6 deletions x/mongo/driver/topology/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ func (rp *resourcePool) initialize() {
// add will add a new rpe to the pool, requires that the resource pool is locked
func (rp *resourcePool) add(e *resourcePoolElement) {
if e == nil {
e = &resourcePoolElement{
value: rp.initFn(),
}
return
}

e.next = rp.start
Expand Down Expand Up @@ -206,9 +204,30 @@ func (rp *resourcePool) Maintain() {
}
}

for rp.totalSize < rp.minSize {
rp.add(nil)
rp.totalSize++
// Determine the number of resources we need to satisfy minSize and start that many goroutines
// that each try to get a new resource and insert it into the pool.
n := rp.minSize - rp.totalSize
for i := uint64(0); i < n; i++ {
go func() {
// If we couldn't increment the total pool size becasue it's already at max size, return
// without adding any resources to the pool.
if !rp.incrementTotal() {
return
}

r := rp.initFn()
// If we didn't get a resource back from initFn(), decrement the total pool size and
// return.
if r == nil {
rp.decrementTotal()
return
}

// If we got a resource back from initFn(), try to insert it into the pool. Use Put()
// to handle the case where we got a resource from initFn() but the resource is expired
// (e.g. a connection that fails handshake).
rp.Put(r)
}()
}

// reset the timer for the background cleanup routine
Expand Down