Skip to content

Commit 3d75312

Browse files
committed
Replace pool.inUseConnections with server.operationCount and respond to PR feedback.
1 parent d212fa8 commit 3d75312

File tree

6 files changed

+53
-24
lines changed

6 files changed

+53
-24
lines changed

mongo/integration/server_selection_prose_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) MongoDB, Inc. 2017-present.
1+
// Copyright (C) MongoDB, Inc. 2022-present.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License"); you may
44
// not use this file except in compliance with the License. You may obtain
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"sync"
1212
"testing"
13+
"time"
1314

1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -50,6 +51,13 @@ func TestServerSelectionProse(t *testing.T) {
5051
AppName: "loadBalancingTest",
5152
},
5253
})
54+
// The automatic failpoint clearing may not clear failpoints set on specific hosts, so
55+
// manually clear the failpoint we set on the specific mongos when the test is done.
56+
defer func() {
57+
mt.ResetClient(options.Client().
58+
SetHosts([]string{failpointHost}))
59+
mt.ClearFailPoints()
60+
}()
5361

5462
// Reset the client with exactly 2 mongos hosts.
5563
tpm := newTestPoolMonitor()
@@ -107,6 +115,11 @@ func TestServerSelectionProse(t *testing.T) {
107115
SetHosts(hosts[:2]).
108116
SetPoolMonitor(tpm.PoolMonitor))
109117

118+
// Sleep for 100ms to allow all server state discovery to complete. We need both servers to
119+
// be selectable when we start running the test or the distribution of selected servers will
120+
// be skewed. Unfortunately there's not currently another signal we can block on.
121+
time.Sleep(100 * time.Millisecond)
122+
110123
// Start 25 goroutines that each run 10 findOne operations. Run 25 goroutines instead of the
111124
// 10 that the prose test specifies to reduce intermittent test failures caused by the
112125
// random selections not being perfectly even over small numbers of operations.

x/mongo/driver/topology/connection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,10 @@ type Connection struct {
593593
refCount int
594594
cleanupPoolFn func()
595595

596+
// cleanupServerFn resets the server state when a connection is returned to the connection pool
597+
// via Close() or expired via Expire().
598+
cleanupServerFn func()
599+
596600
mu sync.RWMutex
597601
}
598602

@@ -694,6 +698,10 @@ func (c *Connection) cleanupReferences() error {
694698
c.cleanupPoolFn()
695699
c.cleanupPoolFn = nil
696700
}
701+
if c.cleanupServerFn != nil {
702+
c.cleanupServerFn()
703+
c.cleanupServerFn = nil
704+
}
697705
c.connection = nil
698706
return err
699707
}

x/mongo/driver/topology/pool.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ type pool struct {
8585
nextID uint64 // nextID is the next pool ID for a new connection.
8686
pinnedCursorConnections uint64
8787
pinnedTransactionConnections uint64
88-
inUseConnections int64
8988

9089
address address.Address
9190
minSize uint64
@@ -452,8 +451,6 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
452451
ConnectionID: w.conn.poolID,
453452
})
454453
}
455-
456-
atomic.AddInt64(&p.inUseConnections, 1)
457454
return w.conn, nil
458455
}
459456

@@ -483,8 +480,6 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
483480
ConnectionID: w.conn.poolID,
484481
})
485482
}
486-
487-
atomic.AddInt64(&p.inUseConnections, 1)
488483
return w.conn, nil
489484
case <-ctx.Done():
490485
if p.monitor != nil {
@@ -588,13 +583,6 @@ func (p *pool) checkIn(conn *connection) error {
588583
})
589584
}
590585

591-
// Decrement the number of in-use connections once we know we have a non-nil connection that
592-
// came from this pool. Do this in checkIn() instead of checkInNoEvent() because the latter is
593-
// called by createConnections() and is not necessarily a check-in of an in-use connection. Use
594-
// an int64 instead of a uint64 to mitigate the impact of any possible bugs that could cause the
595-
// uint64 to underflow, which would effectively make the server unselectable.
596-
atomic.AddInt64(&p.inUseConnections, -1)
597-
598586
return p.checkInNoEvent(conn)
599587
}
600588

@@ -777,10 +765,6 @@ func (p *pool) availableConnectionCount() int {
777765
return len(p.idleConns)
778766
}
779767

780-
func (p *pool) inUseConnectionCount() int64 {
781-
return atomic.LoadInt64(&p.inUseConnections)
782-
}
783-
784768
// createConnections creates connections for wantConn requests on the newConnWait queue.
785769
func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
786770
defer wg.Done()

x/mongo/driver/topology/server.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,13 @@ func connectionStateString(state int64) string {
8282

8383
// Server is a single server within a topology.
8484
type Server struct {
85-
// connectionstate must be accessed using the atomic package and should be at the beginning of
86-
// the struct.
85+
// The following integer fields must be accessed using the atomic package and should be at the
86+
// beginning of the struct.
8787
// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
8888
// - suggested layout: https://go101.org/article/memory-layout.html
89+
8990
connectionstate int64
91+
operationCount int64
9092

9193
cfg *serverConfig
9294
address address.Address
@@ -267,12 +269,28 @@ func (s *Server) Connection(ctx context.Context) (driver.Connection, error) {
267269
return nil, ErrServerClosed
268270
}
269271

270-
connImpl, err := s.pool.checkOut(ctx)
272+
// Increment the operation count before calling checkOut to make sure that all connection
273+
// requests are included in the operation count, including those in the wait queue. If we got an
274+
// error insted of a connection, immediately decrement the operation count.
275+
atomic.AddInt64(&s.operationCount, 1)
276+
conn, err := s.pool.checkOut(ctx)
271277
if err != nil {
278+
atomic.AddInt64(&s.operationCount, -1)
272279
return nil, err
273280
}
274281

275-
return &Connection{connection: connImpl}, nil
282+
return &Connection{
283+
connection: conn,
284+
cleanupServerFn: func() {
285+
// Decrement the operation count whenever the caller is done with the connection. Note
286+
// that cleanupServerFn() is not called while the connection is pinned to a cursor or
287+
// transaction, so the operation count is not decremented until the cursor is closed or
288+
// the transaction is committed or aborted. Use an int64 instead of a uint64 to mitigate
289+
// the impact of any possible bugs that could cause the uint64 to underflow, which would
290+
// make the server much less selectable.
291+
atomic.AddInt64(&s.operationCount, -1)
292+
},
293+
}, nil
276294
}
277295

278296
// ProcessHandshakeError implements SDAM error handling for errors that occur before a connection
@@ -814,6 +832,11 @@ func (s *Server) MinRTT() time.Duration {
814832
return s.rttMonitor.getMinRTT()
815833
}
816834

835+
// OperationCount returns the current number of in-progress operations for this server.
836+
func (s *Server) OperationCount() int64 {
837+
return atomic.LoadInt64(&s.operationCount)
838+
}
839+
817840
// String implements the Stringer interface.
818841
func (s *Server) String() string {
819842
desc := s.Description()

x/mongo/driver/topology/topology.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelect
440440
// Of the two randomly selected suitable servers, pick the one with fewer in-use connections.
441441
// We use in-use connections as an analog for in-progress operations because they are almost
442442
// always the same value for a given server.
443-
if server1.pool.inUseConnectionCount() < server2.pool.inUseConnectionCount() {
443+
if server1.OperationCount() < server2.OperationCount() {
444444
return server1, nil
445445
}
446446
return server2, nil
@@ -678,7 +678,6 @@ func (t *Topology) processSRVResults(parsedHosts []string) bool {
678678
t.subLock.Unlock()
679679

680680
return true
681-
682681
}
683682

684683
// apply updates the Topology and its underlying FSM based on the provided server description and returns the server

x/mongo/driver/topology/topology_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ func runInWindowTest(t *testing.T, directory string, filename string) {
722722
// For each server state in the test's "mocked_topology_state", set the connection pool's
723723
// in-use connections count to the test operation count value.
724724
for _, state := range test.MockedTopologyState {
725-
servers[state.Address].pool.inUseConnections = state.OperationCount
725+
servers[state.Address].operationCount = state.OperationCount
726726
}
727727

728728
// Create a new Topology, set the state to "connected", store a topology description
@@ -775,6 +775,8 @@ func runInWindowTest(t *testing.T, directory string, filename string) {
775775
}
776776

777777
// Otherwise, check if the expected frequency is within the given tolerance range.
778+
// TODO(GODRIVER-2179): Use assert.Deltaf() when we migrate all test code to the
779+
// "testify/assert" or an API-compatible library for assertions.
778780
low := expected - test.Outcome.Tolerance
779781
high := expected + test.Outcome.Tolerance
780782
assert.True(

0 commit comments

Comments
 (0)