Skip to content

GODRIVER-2109 Prevent a data race between connecting and checking out a connection from the resourcePool. #728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"context"
"fmt"
"os"
"reflect"
Expand Down Expand Up @@ -457,6 +458,13 @@ func TestClient(t *testing.T) {
mode := modeVal.StringValue()
assert.Equal(mt, mode, "primaryPreferred", "expected read preference mode primaryPreferred, got %v", mode)
})

// Test that using a client with minPoolSize set doesn't cause a data race.
mtOpts = mtest.NewOptions().ClientOptions(options.Client().SetMinPoolSize(5))
mt.RunOpts("minPoolSize", mtOpts, func(mt *mtest.T) {
err := mt.Client.Ping(context.Background(), readpref.Primary())
assert.Nil(t, err, "unexpected error calling Ping: %v", err)
})
}

type proxyMessage struct {
Expand Down
3 changes: 3 additions & 0 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type connection struct {
idleDeadline atomic.Value // Stores a time.Time
readTimeout time.Duration
writeTimeout time.Duration
descMu sync.RWMutex // Guards desc. TODO: Remove with or after GODRIVER-2038.
desc description.Server
helloRTT time.Duration
compressor wiremessage.CompressorID
Expand Down Expand Up @@ -228,7 +229,9 @@ func (c *connection) connect(ctx context.Context) {
if err == nil {
// We only need to retain the Description field as the connection's description. The authentication-related
// fields in handshakeInfo are tracked by the handshaker if necessary.
c.descMu.Lock()
c.desc = handshakeInfo.Description
c.descMu.Unlock()
c.helloRTT = time.Since(handshakeStartTime)

// If the application has indicated that the cluster is load balanced, ensure the server has included serviceId
Expand Down
14 changes: 12 additions & 2 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,14 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) {

// stale checks if a given connection's generation is below the generation of the pool
func (p *pool) stale(c *connection) bool {
return c == nil || p.generation.stale(c.desc.ServiceID, c.generation)
if c == nil {
return true
}

c.descMu.RLock()
serviceID := c.desc.ServiceID
c.descMu.RUnlock()
return p.generation.stale(serviceID, c.generation)
}

// connect puts the pool into the connected state, allowing it to be used and will allow items to begin being processed from the wait queue
Expand Down Expand Up @@ -532,7 +539,10 @@ func (p *pool) removeConnection(c *connection, reason string) error {
// Only update the generation numbers map if the connection has retrieved its generation number. Otherwise, we'd
// decrement the count for the generation even though it had never been incremented.
if c.hasGenerationNumber() {
p.generation.removeConnection(c.desc.ServiceID)
c.descMu.RLock()
serviceID := c.desc.ServiceID
c.descMu.RUnlock()
p.generation.removeConnection(serviceID)
}

if publishEvent && p.monitor != nil {
Expand Down