Skip to content

GODRIVER-1726 Fix SDAM error handling for write concern errors #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 101 additions & 6 deletions mongo/integration/sdam_error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package integration
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand All @@ -23,16 +24,18 @@ import (

func TestSDAMErrorHandling(t *testing.T) {
mt := mtest.New(t, noClientOpts)
clientOpts := options.Client().
ApplyURI(mt.ConnString()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc)
baseClientOpts := func() *options.ClientOptions {
return options.Client().
ApplyURI(mt.ConnString()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc)
}
baseMtOpts := func() *mtest.Options {
mtOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
ClientOptions(clientOpts)
ClientOptions(baseClientOpts())

if mt.TopologyKind() == mtest.Sharded {
// Pin to a single mongos because the tests use failpoints.
Expand Down Expand Up @@ -201,5 +204,97 @@ func TestSDAMErrorHandling(t *testing.T) {
assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
})
})
mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
// Integration tests for the SDAM error handling code path for errors in server response documents. These
// errors can be part of the top-level document in ok:0 responses or in a nested writeConcernError document.

// On 4.4, some state change errors include a topologyVersion field. Because we're triggering these errors
// via failCommand, the topologyVersion does not actually change as it would in an actual state change.
// This causes the SDAM error handling code path to think we've already handled this state change and
// ignore the error because it's stale. To avoid this altogether, we cap the test to <= 4.2.
serverErrorsMtOpts := baseMtOpts().
MinServerVersion("4.0"). // failCommand support
MaxServerVersion("4.2").
ClientOptions(baseClientOpts().SetRetryWrites(false))

testCases := []struct {
name string
errorCode int32

// For shutdown errors, the pool is always cleared. For non-shutdown errors, the pool is only cleared
// for pre-4.2 servers.
isShutdownError bool
}{
// "node is recovering" errors
{"InterruptedAtShutdown", 11600, true},
{"InterruptedDueToReplStateChange, not shutdown", 11602, false},
{"NotMasterOrSecondary", 13436, false},
{"PrimarySteppedDown", 189, false},
{"ShutdownInProgress", 91, true},

// "not master" errors
{"NotMaster", 10107, false},
{"NotMasterNoSlaveOk", 13435, false},
}
for _, tc := range testCases {
mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()

// Cause the next insert to fail with an ok:0 response.
fp := mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
ErrorCode: tc.errorCode,
},
}
mt.SetFailPoint(fp)

runServerErrorsTest(mt, tc.isShutdownError)
})
mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()

// Cause the next insert to fail with a write concern error.
fp := mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
WriteConcernError: &mtest.WriteConcernErrorData{
Code: tc.errorCode,
},
},
}
mt.SetFailPoint(fp)

runServerErrorsTest(mt, tc.isShutdownError)
})
}
})
})
}

func runServerErrorsTest(mt *mtest.T, isShutdownError bool) {
mt.Helper()

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")

// The pool should always be cleared for shutdown errors, regardless of server version.
if isShutdownError {
assert.True(mt, isPoolCleared(), "expected pool to be cleared, but was not")
return
}

// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
wantCleared := mtest.CompareServerVersions(mt.ServerVersion(), "4.2") < 0
gotCleared := isPoolCleared()
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %v; pool was cleared: %v",
wantCleared, gotCleared)
}
18 changes: 17 additions & 1 deletion x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,22 @@ func (s *Server) RequestImmediateCheck() {
}
}

// getWriteConcernErrorForProcessing extracts a driver.WriteConcernError from the provided error. This function returns
// (error, true) if the error is a WriteConcernError and the falls under the requirements for SDAM error
// handling and (nil, false) otherwise.
func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bool) {
writeCmdErr, ok := err.(driver.WriteCommandError)
if !ok {
return nil, false
}

wcerr := writeCmdErr.WriteConcernError
if wcerr != nil && (wcerr.NodeIsRecovering() || wcerr.NotMaster()) {
return wcerr, true
}
return nil, false
}

// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
func (s *Server) ProcessError(err error, conn driver.Connection) {
// ignore nil error
Expand Down Expand Up @@ -365,7 +381,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) {
}
return
}
if wcerr, ok := err.(driver.WriteConcernError); ok && (wcerr.NodeIsRecovering() || wcerr.NotMaster()) {
if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
// ignore stale error
if description.CompareTopologyVersion(desc.TopologyVersion, wcerr.TopologyVersion) >= 0 {
return
Expand Down
16 changes: 9 additions & 7 deletions x/mongo/driver/topology/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,15 @@ func TestServer(t *testing.T) {
s.connectionstate = connected
s.pool.connected = connected

wce := driver.WriteConcernError{
Name: "",
Code: 10107,
Message: "not master",
Details: []byte{},
Labels: []string{},
TopologyVersion: nil,
wce := driver.WriteCommandError{
WriteConcernError: &driver.WriteConcernError{
Name: "",
Code: 10107,
Message: "not master",
Details: []byte{},
Labels: []string{},
TopologyVersion: nil,
},
}
s.ProcessError(wce, initConnection{})

Expand Down