Skip to content

Commit 157e037

Browse files
committed
Add Retryable Writes
GODRIVER-53 Change-Id: I3613b72b45c311997e399635180e4fec9705ae8e
1 parent cf1cba7 commit 157e037

24 files changed

+593
-188
lines changed

core/command/command.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,108 @@ import (
1717
"github.com/mongodb/mongo-go-driver/core/writeconcern"
1818
)
1919

20+
// DecodeError attempts to decode the wiremessage as an error
21+
func DecodeError(wm wiremessage.WireMessage) error {
22+
var rdr bson.Reader
23+
switch msg := wm.(type) {
24+
case wiremessage.Msg:
25+
for _, section := range msg.Sections {
26+
switch converted := section.(type) {
27+
case wiremessage.SectionBody:
28+
rdr = converted.Document
29+
}
30+
}
31+
case wiremessage.Reply:
32+
if msg.ResponseFlags&wiremessage.QueryFailure != wiremessage.QueryFailure {
33+
return nil
34+
}
35+
rdr = msg.Documents[0]
36+
}
37+
38+
_, err := rdr.Validate()
39+
if err != nil {
40+
return nil
41+
}
42+
43+
extractedError := extractError(rdr)
44+
45+
// If parsed successfully return the error
46+
if _, ok := extractedError.(Error); ok {
47+
return err
48+
}
49+
50+
return nil
51+
}
52+
53+
// helper method to extract an error from a reader if there is one; first returned item is the
54+
// error if it exists, the second holds parsing errors
55+
func extractError(rdr bson.Reader) error {
56+
var errmsg, codeName string
57+
var code int32
58+
var labels []string
59+
itr, err := rdr.Iterator()
60+
if err != nil {
61+
return err
62+
}
63+
64+
for itr.Next() {
65+
elem := itr.Element()
66+
switch elem.Key() {
67+
case "ok":
68+
switch elem.Value().Type() {
69+
case bson.TypeInt32:
70+
if elem.Value().Int32() == 1 {
71+
return nil
72+
}
73+
case bson.TypeInt64:
74+
if elem.Value().Int64() == 1 {
75+
return nil
76+
}
77+
case bson.TypeDouble:
78+
if elem.Value().Double() == 1 {
79+
return nil
80+
}
81+
}
82+
case "errmsg":
83+
if str, okay := elem.Value().StringValueOK(); okay {
84+
errmsg = str
85+
}
86+
case "codeName":
87+
if str, okay := elem.Value().StringValueOK(); okay {
88+
codeName = str
89+
}
90+
case "code":
91+
if c, okay := elem.Value().Int32OK(); okay {
92+
code = c
93+
}
94+
case "errorLabels":
95+
if arr, okay := elem.Value().MutableArrayOK(); okay {
96+
iter, err := arr.Iterator()
97+
if err != nil {
98+
continue
99+
}
100+
for iter.Next() {
101+
if str, ok := iter.Value().StringValueOK(); ok {
102+
labels = append(labels, str)
103+
}
104+
}
105+
106+
}
107+
}
108+
}
109+
110+
if errmsg == "" {
111+
errmsg = "command failed"
112+
}
113+
114+
return Error{
115+
Code: code,
116+
Message: errmsg,
117+
Name: codeName,
118+
Labels: labels,
119+
}
120+
}
121+
20122
func responseClusterTime(response bson.Reader) *bson.Document {
21123
clusterTime, err := response.Lookup("$clusterTime")
22124
if err != nil {

core/command/errors.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"strings"
1414

1515
"github.com/mongodb/mongo-go-driver/bson"
16+
"github.com/mongodb/mongo-go-driver/core/result"
1617
)
1718

1819
var (
@@ -116,6 +117,20 @@ func (e Error) Retryable() bool {
116117
return false
117118
}
118119

120+
// IsWriteConcernErrorRetryable returns true if the write concern error is retryable.
121+
func IsWriteConcernErrorRetryable(wce *result.WriteConcernError) bool {
122+
for _, code := range retryableCodes {
123+
if int32(wce.Code) == code {
124+
return true
125+
}
126+
}
127+
if strings.Contains(wce.ErrMsg, "not master") || strings.Contains(wce.ErrMsg, "node is recovering") {
128+
return true
129+
}
130+
131+
return false
132+
}
133+
119134
// IsNotFound indicates if the error is from a namespace not being found.
120135
func IsNotFound(err error) bool {
121136
e, ok := err.(Error)

core/command/insert.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Insert struct {
3636
WriteConcern *writeconcern.WriteConcern
3737
Session *session.Client
3838

39+
batches []*Write
3940
result result.Insert
4041
err error
4142
continueOnError bool
@@ -89,13 +90,13 @@ splitInserts:
8990

9091
// Encode will encode this command into a wire message for the given server description.
9192
func (i *Insert) Encode(desc description.SelectedServer) ([]wiremessage.WireMessage, error) {
92-
cmds, err := i.encode(desc)
93+
err := i.encode(desc)
9394
if err != nil {
9495
return nil, err
9596
}
9697

97-
wms := make([]wiremessage.WireMessage, len(cmds))
98-
for _, cmd := range cmds {
98+
wms := make([]wiremessage.WireMessage, len(i.batches))
99+
for _, cmd := range i.batches {
99100
wm, err := cmd.Encode(desc)
100101
if err != nil {
101102
return nil, err
@@ -143,23 +144,21 @@ func (i *Insert) encodeBatch(docs []*bson.Document, desc description.SelectedSer
143144
}, nil
144145
}
145146

146-
func (i *Insert) encode(desc description.SelectedServer) ([]*Write, error) {
147-
out := []*Write{}
147+
func (i *Insert) encode(desc description.SelectedServer) error {
148148
batches, err := i.split(int(desc.MaxBatchCount), int(desc.MaxDocumentSize))
149149
if err != nil {
150-
return nil, err
150+
return err
151151
}
152152

153153
for _, docs := range batches {
154154
cmd, err := i.encodeBatch(docs, desc)
155155
if err != nil {
156-
return nil, err
156+
return err
157157
}
158158

159-
out = append(out, cmd)
159+
i.batches = append(i.batches, cmd)
160160
}
161-
162-
return out, nil
161+
return nil
163162
}
164163

165164
// Decode will decode the wire message using the provided server description. Errors during decoding
@@ -193,14 +192,25 @@ func (i *Insert) Err() error { return i.err }
193192
// RoundTrip handles the execution of this command using the provided wiremessage.ReadWriter.
194193
func (i *Insert) RoundTrip(ctx context.Context, desc description.SelectedServer, rw wiremessage.ReadWriter) (result.Insert, error) {
195194
res := result.Insert{}
196-
cmds, err := i.encode(desc)
197-
if err != nil {
198-
return res, err
195+
if i.batches == nil {
196+
err := i.encode(desc)
197+
if err != nil {
198+
return res, err
199+
}
199200
}
200201

201-
for _, cmd := range cmds {
202+
// hold onto txnNumber, reset it when loop exits to ensure reuse of same
203+
// transaction number if retry is needed
204+
var txnNumber int64
205+
if i.Session != nil && i.Session.RetryWrite {
206+
txnNumber = i.Session.TxnNumber
207+
}
208+
for j, cmd := range i.batches {
202209
rdr, err := cmd.RoundTrip(ctx, desc, rw)
203210
if err != nil {
211+
if i.Session != nil && i.Session.RetryWrite {
212+
i.Session.TxnNumber = txnNumber + int64(j)
213+
}
204214
return res, err
205215
}
206216

@@ -213,13 +223,29 @@ func (i *Insert) RoundTrip(ctx context.Context, desc description.SelectedServer,
213223

214224
if r.WriteConcernError != nil {
215225
res.WriteConcernError = r.WriteConcernError
226+
if i.Session != nil && i.Session.RetryWrite {
227+
i.Session.TxnNumber = txnNumber
228+
return res, nil // report writeconcernerror for retry
229+
}
216230
}
217231

218232
res.N += r.N
219233

220234
if !i.continueOnError && len(res.WriteErrors) > 0 {
221235
return res, nil
222236
}
237+
238+
// Increment txnNumber for each batch
239+
if i.Session != nil && i.Session.RetryWrite {
240+
i.Session.IncrementTxnNumber()
241+
i.batches = i.batches[1:] // if batch encoded successfully, remove it from the slice
242+
}
243+
}
244+
245+
if i.Session != nil && i.Session.RetryWrite {
246+
// if retryable write succeeded, transaction number will be incremented one extra time,
247+
// so we decrement it here
248+
i.Session.TxnNumber--
223249
}
224250

225251
return res, nil

core/command/opmsg.go

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -42,73 +42,9 @@ func decodeCommandOpMsg(msg wiremessage.Msg) (bson.Reader, error) {
4242
return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err)
4343
}
4444

45-
ok := false
46-
var errmsg, codeName string
47-
var code int32
48-
var labels []string
49-
itr, err := rdr.Iterator()
45+
err = extractError(rdr)
5046
if err != nil {
51-
return nil, NewCommandResponseError("malformed OP_MSG: cannot iterate document", err)
52-
}
53-
54-
for itr.Next() {
55-
elem := itr.Element()
56-
switch elem.Key() {
57-
case "ok":
58-
switch elem.Value().Type() {
59-
case bson.TypeInt32:
60-
if elem.Value().Int32() == 1 {
61-
ok = true
62-
}
63-
case bson.TypeInt64:
64-
if elem.Value().Int64() == 1 {
65-
ok = true
66-
}
67-
case bson.TypeDouble:
68-
if elem.Value().Double() == 1 {
69-
ok = true
70-
}
71-
}
72-
case "errmsg":
73-
if str, okay := elem.Value().StringValueOK(); okay {
74-
errmsg = str
75-
}
76-
case "codeName":
77-
if str, okay := elem.Value().StringValueOK(); okay {
78-
codeName = str
79-
}
80-
case "code":
81-
if c, okay := elem.Value().Int32OK(); okay {
82-
code = c
83-
}
84-
case "errorLabels":
85-
if arr, okay := elem.Value().MutableArrayOK(); okay {
86-
iter, err := arr.Iterator()
87-
if err != nil {
88-
continue
89-
}
90-
for iter.Next() {
91-
if str, ok := iter.Value().StringValueOK(); ok {
92-
labels = append(labels, str)
93-
}
94-
}
95-
96-
}
97-
}
98-
}
99-
100-
if !ok {
101-
if errmsg == "" {
102-
errmsg = "command failed"
103-
}
104-
105-
return nil, Error{
106-
Code: code,
107-
Message: errmsg,
108-
Name: codeName,
109-
Labels: labels,
110-
}
47+
return nil, err
11148
}
112-
11349
return rdr, nil
11450
}

core/command/opreply.go

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -35,56 +35,9 @@ func decodeCommandOpReply(reply wiremessage.Reply) (bson.Reader, error) {
3535
}
3636
}
3737

38-
ok := false
39-
var errmsg, codeName string
40-
var code int32
41-
itr, err := rdr.Iterator()
38+
err = extractError(rdr)
4239
if err != nil {
43-
return nil, NewCommandResponseError("malformed OP_REPLY: cannot iterate document", err)
40+
return nil, err
4441
}
45-
for itr.Next() {
46-
elem := itr.Element()
47-
switch elem.Key() {
48-
case "ok":
49-
switch elem.Value().Type() {
50-
case bson.TypeInt32:
51-
if elem.Value().Int32() == 1 {
52-
ok = true
53-
}
54-
case bson.TypeInt64:
55-
if elem.Value().Int64() == 1 {
56-
ok = true
57-
}
58-
case bson.TypeDouble:
59-
if elem.Value().Double() == 1 {
60-
ok = true
61-
}
62-
}
63-
case "errmsg":
64-
if str, okay := elem.Value().StringValueOK(); okay {
65-
errmsg = str
66-
}
67-
case "codeName":
68-
if str, okay := elem.Value().StringValueOK(); okay {
69-
codeName = str
70-
}
71-
case "code":
72-
if c, okay := elem.Value().Int32OK(); okay {
73-
code = c
74-
}
75-
}
76-
}
77-
78-
if !ok {
79-
if errmsg == "" {
80-
errmsg = "command failed"
81-
}
82-
return nil, Error{
83-
Code: code,
84-
Message: errmsg,
85-
Name: codeName,
86-
}
87-
}
88-
8942
return rdr, nil
9043
}

0 commit comments

Comments
 (0)