Skip to content

Commit e20a58c

Browse files
committed
Replace pool.inUseConnections with server.operationCount and respond to PR feedback.
1 parent 5b56d7c commit e20a58c

File tree

6 files changed

+55
-26
lines changed

6 files changed

+55
-26
lines changed

mongo/integration/server_selection_prose_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) MongoDB, Inc. 2017-present.
1+
// Copyright (C) MongoDB, Inc. 2022-present.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License"); you may
44
// not use this file except in compliance with the License. You may obtain
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"sync"
1212
"testing"
13+
"time"
1314

1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -50,6 +51,13 @@ func TestServerSelectionProse(t *testing.T) {
5051
AppName: "loadBalancingTest",
5152
},
5253
})
54+
// The automatic failpoint clearing may not clear failpoints set on specific hosts, so
55+
// manually clear the failpoint we set on the specific mongos when the test is done.
56+
defer func() {
57+
mt.ResetClient(options.Client().
58+
SetHosts([]string{failpointHost}))
59+
mt.ClearFailPoints()
60+
}()
5361

5462
// Reset the client with exactly 2 mongos hosts.
5563
tpm := newTestPoolMonitor()
@@ -107,6 +115,11 @@ func TestServerSelectionProse(t *testing.T) {
107115
SetHosts(hosts[:2]).
108116
SetPoolMonitor(tpm.PoolMonitor))
109117

118+
// Sleep for 100ms to allow all server state discovery to complete. We need both servers to
119+
// be selectable when we start running the test or the distribution of selected servers will
120+
// be skewed. Unfortunately there's not currently another signal we can block on.
121+
time.Sleep(100 * time.Millisecond)
122+
110123
// Start 25 goroutines that each run 10 findOne operations. Run 25 goroutines instead of the
111124
// 10 that the prose test specifies to reduce intermittent test failures caused by the
112125
// random selections not being perfectly even over small numbers of operations.

x/mongo/driver/topology/connection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,10 @@ type Connection struct {
600600
refCount int
601601
cleanupPoolFn func()
602602

603+
// cleanupServerFn resets the server state when a connection is returned to the connection pool
604+
// via Close() or expired via Expire().
605+
cleanupServerFn func()
606+
603607
mu sync.RWMutex
604608
}
605609

@@ -701,6 +705,10 @@ func (c *Connection) cleanupReferences() error {
701705
c.cleanupPoolFn()
702706
c.cleanupPoolFn = nil
703707
}
708+
if c.cleanupServerFn != nil {
709+
c.cleanupServerFn()
710+
c.cleanupServerFn = nil
711+
}
704712
c.connection = nil
705713
return err
706714
}

x/mongo/driver/topology/pool.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ type pool struct {
8585
nextID uint64 // nextID is the next pool ID for a new connection.
8686
pinnedCursorConnections uint64
8787
pinnedTransactionConnections uint64
88-
inUseConnections int64
8988

9089
address address.Address
9190
minSize uint64
@@ -452,8 +451,6 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
452451
ConnectionID: w.conn.poolID,
453452
})
454453
}
455-
456-
atomic.AddInt64(&p.inUseConnections, 1)
457454
return w.conn, nil
458455
}
459456

@@ -483,8 +480,6 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
483480
ConnectionID: w.conn.poolID,
484481
})
485482
}
486-
487-
atomic.AddInt64(&p.inUseConnections, 1)
488483
return w.conn, nil
489484
case <-ctx.Done():
490485
if p.monitor != nil {
@@ -588,13 +583,6 @@ func (p *pool) checkIn(conn *connection) error {
588583
})
589584
}
590585

591-
// Decrement the number of in-use connections once we know we have a non-nil connection that
592-
// came from this pool. Do this in checkIn() instead of checkInNoEvent() because the latter is
593-
// called by createConnections() and is not necessarily a check-in of an in-use connection. Use
594-
// an int64 instead of a uint64 to mitigate the impact of any possible bugs that could cause the
595-
// uint64 to underflow, which would effectively make the server unselectable.
596-
atomic.AddInt64(&p.inUseConnections, -1)
597-
598586
return p.checkInNoEvent(conn)
599587
}
600588

@@ -777,10 +765,6 @@ func (p *pool) availableConnectionCount() int {
777765
return len(p.idleConns)
778766
}
779767

780-
func (p *pool) inUseConnectionCount() int64 {
781-
return atomic.LoadInt64(&p.inUseConnections)
782-
}
783-
784768
// createConnections creates connections for wantConn requests on the newConnWait queue.
785769
func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
786770
defer wg.Done()

x/mongo/driver/topology/server.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@ func (ss *SelectedServer) Description() description.SelectedServer {
7676

7777
// Server is a single server within a topology.
7878
type Server struct {
79-
// state must be accessed using the atomic package and should be at the beginning of
80-
// the struct.
79+
// The following integer fields must be accessed using the atomic package and should be at the
80+
// beginning of the struct.
8181
// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
8282
// - suggested layout: https://go101.org/article/memory-layout.html
83-
state int64
83+
84+
state int64
85+
operationCount int64
8486

8587
cfg *serverConfig
8688
address address.Address
@@ -261,12 +263,28 @@ func (s *Server) Connection(ctx context.Context) (driver.Connection, error) {
261263
return nil, ErrServerClosed
262264
}
263265

264-
connImpl, err := s.pool.checkOut(ctx)
266+
// Increment the operation count before calling checkOut to make sure that all connection
267+
// requests are included in the operation count, including those in the wait queue. If we got an
268+
// error insted of a connection, immediately decrement the operation count.
269+
atomic.AddInt64(&s.operationCount, 1)
270+
conn, err := s.pool.checkOut(ctx)
265271
if err != nil {
272+
atomic.AddInt64(&s.operationCount, -1)
266273
return nil, err
267274
}
268275

269-
return &Connection{connection: connImpl}, nil
276+
return &Connection{
277+
connection: conn,
278+
cleanupServerFn: func() {
279+
// Decrement the operation count whenever the caller is done with the connection. Note
280+
// that cleanupServerFn() is not called while the connection is pinned to a cursor or
281+
// transaction, so the operation count is not decremented until the cursor is closed or
282+
// the transaction is committed or aborted. Use an int64 instead of a uint64 to mitigate
283+
// the impact of any possible bugs that could cause the uint64 to underflow, which would
284+
// make the server much less selectable.
285+
atomic.AddInt64(&s.operationCount, -1)
286+
},
287+
}, nil
270288
}
271289

272290
// ProcessHandshakeError implements SDAM error handling for errors that occur before a connection
@@ -808,6 +826,11 @@ func (s *Server) MinRTT() time.Duration {
808826
return s.rttMonitor.getMinRTT()
809827
}
810828

829+
// OperationCount returns the current number of in-progress operations for this server.
830+
func (s *Server) OperationCount() int64 {
831+
return atomic.LoadInt64(&s.operationCount)
832+
}
833+
811834
// String implements the Stringer interface.
812835
func (s *Server) String() string {
813836
desc := s.Description()

x/mongo/driver/topology/topology.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelect
448448
// Of the two randomly selected suitable servers, pick the one with fewer in-use connections.
449449
// We use in-use connections as an analog for in-progress operations because they are almost
450450
// always the same value for a given server.
451-
if server1.pool.inUseConnectionCount() < server2.pool.inUseConnectionCount() {
451+
if server1.OperationCount() < server2.OperationCount() {
452452
return server1, nil
453453
}
454454
return server2, nil
@@ -686,7 +686,6 @@ func (t *Topology) processSRVResults(parsedHosts []string) bool {
686686
t.subLock.Unlock()
687687

688688
return true
689-
690689
}
691690

692691
// apply updates the Topology and its underlying FSM based on the provided server description and returns the server

x/mongo/driver/topology/topology_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -722,15 +722,15 @@ func runInWindowTest(t *testing.T, directory string, filename string) {
722722
// For each server state in the test's "mocked_topology_state", set the connection pool's
723723
// in-use connections count to the test operation count value.
724724
for _, state := range test.MockedTopologyState {
725-
servers[state.Address].pool.inUseConnections = state.OperationCount
725+
servers[state.Address].operationCount = state.OperationCount
726726
}
727727

728728
// Create a new Topology, set the state to "connected", store a topology description
729729
// containing all server descriptions created from the test server descriptions, and copy
730730
// all *Server instances to the Topology's servers list.
731731
topology, err := New()
732732
require.NoError(t, err, "error creating new Topology")
733-
topology.connectionstate = connected
733+
topology.state = topologyConnected
734734
topology.desc.Store(description.Topology{
735735
Kind: topologyKindFromString(t, test.TopologyDescription.Type),
736736
Servers: descriptions,
@@ -775,6 +775,8 @@ func runInWindowTest(t *testing.T, directory string, filename string) {
775775
}
776776

777777
// Otherwise, check if the expected frequency is within the given tolerance range.
778+
// TODO(GODRIVER-2179): Use assert.Deltaf() when we migrate all test code to the
779+
// "testify/assert" or an API-compatible library for assertions.
778780
low := expected - test.Outcome.Tolerance
779781
high := expected + test.Outcome.Tolerance
780782
assert.True(

0 commit comments

Comments
 (0)