Skip to content

Commit 02572e2

Browse files
authored
GODRIVER-2065 Don't send the operation if the Context deadline is before (now + minimum RTT). (#761)
1 parent 28ac11a commit 02572e2

File tree

15 files changed

+600
-135
lines changed

15 files changed

+600
-135
lines changed

internal/testutil/assert/assert.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ func NotNil(t testing.TB, obj interface{}, msg string, args ...interface{}) {
8181
}
8282
}
8383

84-
// Soon runs the provided callback for a maximum of timeoutMS milliseconds. It returns the callback error
85-
// if the callback returned and ErrCallbackTimedOut if the timeout expired.
84+
// Soon runs the provided callback for a maximum of timeoutMS milliseconds.
8685
func Soon(t testing.TB, callback func(), timeout time.Duration) {
8786
t.Helper()
8887

mongo/change_stream_deployment.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mongo
88

99
import (
1010
"context"
11+
"time"
1112

1213
"go.mongodb.org/mongo-driver/mongo/description"
1314
"go.mongodb.org/mongo-driver/x/mongo/driver"
@@ -35,6 +36,10 @@ func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection,
3536
return c.conn, nil
3637
}
3738

39+
func (c *changeStreamDeployment) MinRTT() time.Duration {
40+
return c.server.MinRTT()
41+
}
42+
3843
func (c *changeStreamDeployment) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
3944
ep, ok := c.server.(driver.ErrorProcessor)
4045
if !ok {

mongo/integration/client_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package integration
99
import (
1010
"context"
1111
"fmt"
12+
"net"
1213
"os"
1314
"reflect"
1415
"strings"
@@ -53,6 +54,46 @@ func (e *negateCodec) DecodeValue(ectx bsoncodec.DecodeContext, vr bsonrw.ValueR
5354
return nil
5455
}
5556

57+
var _ options.ContextDialer = &slowConnDialer{}
58+
59+
// A slowConnDialer dials connections that delay network round trips by the given delay duration.
60+
type slowConnDialer struct {
61+
dialer *net.Dialer
62+
delay time.Duration
63+
}
64+
65+
func newSlowConnDialer(delay time.Duration) *slowConnDialer {
66+
return &slowConnDialer{
67+
dialer: &net.Dialer{},
68+
delay: delay,
69+
}
70+
}
71+
72+
func (scd *slowConnDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
73+
conn, err := scd.dialer.DialContext(ctx, network, address)
74+
if err != nil {
75+
return nil, err
76+
}
77+
return &slowConn{
78+
Conn: conn,
79+
delay: scd.delay,
80+
}, nil
81+
}
82+
83+
var _ net.Conn = &slowConn{}
84+
85+
// slowConn is a net.Conn that delays all calls to Read() by given delay durations. All other
86+
// net.Conn functions behave identically to the embedded net.Conn.
87+
type slowConn struct {
88+
net.Conn
89+
delay time.Duration
90+
}
91+
92+
func (sc *slowConn) Read(b []byte) (n int, err error) {
93+
time.Sleep(sc.delay)
94+
return sc.Conn.Read(b)
95+
}
96+
5697
func TestClient(t *testing.T) {
5798
mt := mtest.New(t, noClientOpts)
5899
defer mt.Close()
@@ -467,6 +508,95 @@ func TestClient(t *testing.T) {
467508
err := mt.Client.Ping(context.Background(), readpref.Primary())
468509
assert.Nil(t, err, "unexpected error calling Ping: %v", err)
469510
})
511+
512+
mt.Run("minimum RTT is monitored", func(mt *mtest.T) {
513+
if testing.Short() {
514+
t.Skip("skipping integration test in short mode")
515+
}
516+
517+
// Reset the client with a dialer that delays all network round trips by 300ms and set the
518+
// heartbeat interval to 100ms to reduce the time it takes to collect RTT samples.
519+
mt.ResetClient(options.Client().
520+
SetDialer(newSlowConnDialer(300 * time.Millisecond)).
521+
SetHeartbeatInterval(100 * time.Millisecond))
522+
523+
// Assert that the minimum RTT is eventually >250ms.
524+
topo := getTopologyFromClient(mt.Client)
525+
assert.Soon(mt, func() {
526+
for {
527+
time.Sleep(100 * time.Millisecond)
528+
529+
// Wait for all of the server's minimum RTTs to be >250ms.
530+
done := true
531+
for _, desc := range topo.Description().Servers {
532+
server, err := topo.FindServer(desc)
533+
assert.Nil(mt, err, "FindServer error: %v", err)
534+
if server.MinRTT() <= 250*time.Millisecond {
535+
done = false
536+
}
537+
}
538+
if done {
539+
return
540+
}
541+
}
542+
}, 10*time.Second)
543+
})
544+
545+
// Test that if the minimum RTT is greater than the remaining timeout for an operation, the
546+
// operation is not sent to the server and no connections are closed.
547+
mt.Run("minimum RTT used to prevent sending requests", func(mt *mtest.T) {
548+
if testing.Short() {
549+
t.Skip("skipping integration test in short mode")
550+
}
551+
552+
// Assert that we can call Ping with a 250ms timeout.
553+
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
554+
defer cancel()
555+
err := mt.Client.Ping(ctx, nil)
556+
assert.Nil(mt, err, "Ping error: %v", err)
557+
558+
// Reset the client with a dialer that delays all network round trips by 300ms and set the
559+
// heartbeat interval to 100ms to reduce the time it takes to collect RTT samples.
560+
tpm := newTestPoolMonitor()
561+
mt.ResetClient(options.Client().
562+
SetPoolMonitor(tpm.PoolMonitor).
563+
SetDialer(newSlowConnDialer(300 * time.Millisecond)).
564+
SetHeartbeatInterval(100 * time.Millisecond))
565+
566+
// Assert that the minimum RTT is eventually >250ms.
567+
topo := getTopologyFromClient(mt.Client)
568+
assert.Soon(mt, func() {
569+
for {
570+
time.Sleep(100 * time.Millisecond)
571+
572+
// Wait for all of the server's minimum RTTs to be >250ms.
573+
done := true
574+
for _, desc := range topo.Description().Servers {
575+
server, err := topo.FindServer(desc)
576+
assert.Nil(mt, err, "FindServer error: %v", err)
577+
if server.MinRTT() <= 250*time.Millisecond {
578+
done = false
579+
}
580+
}
581+
if done {
582+
return
583+
}
584+
}
585+
}, 10*time.Second)
586+
587+
// Once we've waited for the minimum RTT for the single server to be >250ms, run a bunch of
588+
// Ping operations with a timeout of 250ms and expect that they return errors.
589+
for i := 0; i < 10; i++ {
590+
ctx, cancel = context.WithTimeout(context.Background(), 250*time.Millisecond)
591+
err := mt.Client.Ping(ctx, nil)
592+
cancel()
593+
assert.NotNil(mt, err, "expected Ping to return an error")
594+
}
595+
596+
// Assert that the Ping timeouts result in no connections being closed.
597+
closed := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionClosed }))
598+
assert.Equal(t, 0, closed, "expected no connections to be closed")
599+
})
470600
}
471601

472602
func TestClientStress(t *testing.T) {

mongo/integration/mtest/opmsg_deployment.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mtest
88

99
import (
1010
"context"
11+
"time"
1112

1213
"github.com/pkg/errors"
1314
"go.mongodb.org/mongo-driver/bson"
@@ -133,6 +134,11 @@ func (md *mockDeployment) Connection(context.Context) (driver.Connection, error)
133134
return md.conn, nil
134135
}
135136

137+
// MinRTT always returns 0. It implements the driver.Server interface.
138+
func (md *mockDeployment) MinRTT() time.Duration {
139+
return 0
140+
}
141+
136142
// Connect is a no-op method which implements the driver.Connector interface.
137143
func (md *mockDeployment) Connect() error {
138144
return nil

mongo/integration/mtest/proxy_dialer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,8 @@ func (p *proxyDialer) Messages() []*ProxyMessage {
131131
p.Lock()
132132
defer p.Unlock()
133133

134-
copiedMessages := make([]*ProxyMessage, 0, len(p.messages))
135-
for _, msg := range p.messages {
136-
copiedMessages = append(copiedMessages, msg)
137-
}
134+
copiedMessages := make([]*ProxyMessage, len(p.messages))
135+
copy(copiedMessages, p.messages)
138136
return copiedMessages
139137
}
140138

mongo/integration/sessions_test.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"go.mongodb.org/mongo-driver/bson"
16+
"go.mongodb.org/mongo-driver/internal"
1617
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1718
"go.mongodb.org/mongo-driver/mongo"
1819
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
@@ -130,22 +131,33 @@ func TestSessions(t *testing.T) {
130131
mt.RunOpts("cluster time is updated from handshakes", clusterTimeHandshakeMtOpts, func(mt *mtest.T) {
131132
err := mt.Client.Ping(mtest.Background, mtest.PrimaryRp)
132133
assert.Nil(mt, err, "Ping error: %v", err)
133-
msgPairs := mt.GetProxiedMessages()
134-
assert.True(mt, len(msgPairs) > 2, "expected more than two messages, got %d", len(msgPairs))
135-
136-
for idx, pair := range mt.GetProxiedMessages() {
137-
// Get the $clusterTime value sent to the server. The first three messages are the handshakes for the
138-
// heartbeat, RTT, and application connections. These should not contain $clusterTime because they happen on
139-
// connections that don't know the server's wire version and therefore don't know if the server supports
140-
// $clusterTime.
141-
_, err = pair.Sent.Command.LookupErr("$clusterTime")
142-
if idx <= 2 {
143-
assert.NotNil(mt, err, "expected no $clusterTime field in command %s", pair.Sent.Command)
144-
continue
145-
}
146134

147-
// All messages after the first two should contain $clusterTime.
148-
assert.Nil(mt, err, "expected $clusterTime field in command %s", pair.Sent.Command)
135+
// Assert that all sent commands (including handshake commands) include a "$clusterTime" in
136+
// the command document.
137+
for _, pair := range mt.GetProxiedMessages() {
138+
_, err := pair.Sent.Command.LookupErr("$clusterTime")
139+
hasClusterTime := err == nil
140+
141+
switch pair.CommandName {
142+
// If the command is either legacy hello or "hello" (used as the first message in any
143+
// handshake or the checks from the heartbeat or RTT monitors), expect that there is no
144+
// "$clusterTime" because the connection doesn't know the server's wire version yet so
145+
// it doesn't know if the connection supports "$clusterTime".
146+
case internal.LegacyHello, "hello":
147+
assert.False(
148+
mt,
149+
hasClusterTime,
150+
"expected no $clusterTime field in command %s",
151+
pair.Sent.Command)
152+
// If the command is anything else (including other handshake commands), assert that the
153+
// command includes "$clusterTime".
154+
default:
155+
assert.True(
156+
mt,
157+
hasClusterTime,
158+
"expected $clusterTime field in in Ping command %s",
159+
pair.Sent.Command)
160+
}
149161
}
150162
})
151163

x/mongo/driver/batch_cursor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"strings"
8+
"time"
89

910
"go.mongodb.org/mongo-driver/bson/bsontype"
1011
"go.mongodb.org/mongo-driver/event"
@@ -426,6 +427,11 @@ func (lbcd *loadBalancedCursorDeployment) Connection(_ context.Context) (Connect
426427
return lbcd.conn, nil
427428
}
428429

430+
// MinRTT always returns 0. It implements the driver.Server interface.
431+
func (lbcd *loadBalancedCursorDeployment) MinRTT() time.Duration {
432+
return 0
433+
}
434+
429435
func (lbcd *loadBalancedCursorDeployment) ProcessError(err error, conn Connection) ProcessErrorResult {
430436
return lbcd.errorProcessor.ProcessError(err, conn)
431437
}

x/mongo/driver/driver.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package driver // import "go.mongodb.org/mongo-driver/x/mongo/driver"
22

33
import (
44
"context"
5+
"time"
56

67
"go.mongodb.org/mongo-driver/mongo/address"
78
"go.mongodb.org/mongo-driver/mongo/description"
@@ -43,6 +44,9 @@ type Subscriber interface {
4344
// retrieving and returning of connections.
4445
type Server interface {
4546
Connection(context.Context) (Connection, error)
47+
48+
// MinRTT returns the minimum round-trip time to the server observed over the window period.
49+
MinRTT() time.Duration
4650
}
4751

4852
// Connection represents a connection to a MongoDB server.
@@ -194,6 +198,11 @@ func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, e
194198
return ssd.C, nil
195199
}
196200

201+
// MinRTT always returns 0. It implements the driver.Server interface.
202+
func (ssd SingleConnectionDeployment) MinRTT() time.Duration {
203+
return 0
204+
}
205+
197206
// TODO(GODRIVER-617): We can likely use 1 type for both the Type and the RetryMode by using
198207
// 2 bits for the mode and 1 bit for the type. Although in the practical sense, we might not want to
199208
// do that since the type of retryability is tied to the operation itself and isn't going change,

x/mongo/driver/drivertest/channel_netconn.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,12 @@ func (c *ChannelNetConn) SetWriteDeadline(_ time.Time) error {
7676

7777
// GetWrittenMessage gets the last wire message written to the connection
7878
func (c *ChannelNetConn) GetWrittenMessage() []byte {
79-
var wm []byte
8079
select {
81-
case wm = <-c.Written:
80+
case wm := <-c.Written:
81+
return wm
82+
default:
83+
return nil
8284
}
83-
return wm
8485
}
8586

8687
// AddResponse adds a response to the connection.

0 commit comments

Comments
 (0)