Skip to content

Commit cf1cba7

Browse files
committed
Merge branch 'pr/81'
Change-Id: I36542fdacf1a5f2df21fd15aac1c9e08026453b2
2 parents 0ce3b48 + 680933b commit cf1cba7

File tree

89 files changed

+26241
-691
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+26241
-691
lines changed

core/command/abort_transaction.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package command
2+
3+
import (
4+
"context"
5+
6+
"github.com/mongodb/mongo-go-driver/bson"
7+
"github.com/mongodb/mongo-go-driver/core/description"
8+
"github.com/mongodb/mongo-go-driver/core/result"
9+
"github.com/mongodb/mongo-go-driver/core/session"
10+
"github.com/mongodb/mongo-go-driver/core/wiremessage"
11+
)
12+
13+
// AbortTransaction represents the abortTransaction() command
14+
type AbortTransaction struct {
15+
Session *session.Client
16+
err error
17+
result result.TransactionResult
18+
}
19+
20+
// Encode will encode this command into a wiremessage for the given server description.
21+
func (at *AbortTransaction) Encode(desc description.SelectedServer) (wiremessage.WireMessage, error) {
22+
cmd := at.encode(desc)
23+
return cmd.Encode(desc)
24+
}
25+
26+
func (at *AbortTransaction) encode(desc description.SelectedServer) *Write {
27+
cmd := bson.NewDocument(bson.EC.Int32("abortTransaction", 1))
28+
return &Write{
29+
DB: "admin",
30+
Command: cmd,
31+
Session: at.Session,
32+
WriteConcern: at.Session.CurrentWc,
33+
}
34+
}
35+
36+
// Decode will decode the wire message using the provided server description. Errors during decoding are deferred until
37+
// either the Result or Err methods are called.
38+
func (at *AbortTransaction) Decode(desc description.SelectedServer, wm wiremessage.WireMessage) *AbortTransaction {
39+
rdr, err := (&Write{}).Decode(desc, wm).Result()
40+
if err != nil {
41+
at.err = err
42+
return at
43+
}
44+
45+
return at.decode(desc, rdr)
46+
}
47+
48+
func (at *AbortTransaction) decode(desc description.SelectedServer, rdr bson.Reader) *AbortTransaction {
49+
at.err = bson.Unmarshal(rdr, &at.result)
50+
if at.err == nil && at.result.WriteConcernError != nil {
51+
at.err = Error{
52+
Code: int32(at.result.WriteConcernError.Code),
53+
Message: at.result.WriteConcernError.ErrMsg,
54+
}
55+
}
56+
return at
57+
}
58+
59+
// Result returns the result of a decoded wire message and server description.
60+
func (at *AbortTransaction) Result() (result.TransactionResult, error) {
61+
if at.err != nil {
62+
return result.TransactionResult{}, at.err
63+
}
64+
65+
return at.result, nil
66+
}
67+
68+
// Err returns the error set on this command
69+
func (at *AbortTransaction) Err() error {
70+
return at.err
71+
}
72+
73+
// RoundTrip handles the execution of this command using the provided wiremessage.ReadWriter
74+
func (at *AbortTransaction) RoundTrip(ctx context.Context, desc description.SelectedServer, rw wiremessage.ReadWriter) (result.TransactionResult, error) {
75+
cmd := at.encode(desc)
76+
rdr, err := cmd.RoundTrip(ctx, desc, rw)
77+
if err != nil {
78+
return result.TransactionResult{}, err
79+
}
80+
81+
return at.decode(desc, rdr).Result()
82+
}

core/command/aggregate.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,14 @@ func (a *Aggregate) decode(desc description.SelectedServer, cb CursorBuilder, rd
144144
opts = append(opts, curOpt)
145145
}
146146

147-
a.result, a.err = cb.BuildCursor(rdr, a.Session, a.Clock, opts...)
147+
labels, err := getErrorLabels(&rdr)
148+
a.err = err
149+
150+
res, err := cb.BuildCursor(rdr, a.Session, a.Clock, opts...)
151+
a.result = res
152+
if err != nil {
153+
a.err = Error{Message: err.Error(), Labels: labels}
154+
}
148155
return a
149156
}
150157

core/command/command.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
7373
return cmd.MarshalBSON()
7474
}
7575

76-
// add a session ID to a BSON doc representing a command
77-
func addSessionID(cmd *bson.Document, desc description.SelectedServer, client *session.Client) error {
76+
// adds session related fields to a BSON doc representing a command
77+
func addSessionFields(cmd *bson.Document, desc description.SelectedServer, client *session.Client) error {
7878
if client == nil || !description.SessionsSupported(desc.WireVersion) || desc.SessionTimeoutMinutes == 0 {
7979
return nil
8080
}
@@ -88,9 +88,27 @@ func addSessionID(cmd *bson.Document, desc description.SelectedServer, client *s
8888
}
8989

9090
cmd.Append(bson.EC.SubDocument("lsid", client.SessionID))
91+
92+
if client.TransactionRunning() ||
93+
client.RetryingCommit {
94+
addTransaction(cmd, client)
95+
}
96+
97+
client.ApplyCommand() // advance the state machine based on a command executing
98+
9199
return nil
92100
}
93101

102+
// if in a transaction, add the transaction fields
103+
func addTransaction(cmd *bson.Document, client *session.Client) {
104+
cmd.Append(bson.EC.Int64("txnNumber", client.TxnNumber))
105+
if client.TransactionStarting() {
106+
// When starting transaction, always transition to the next state, even on error
107+
cmd.Append(bson.EC.Boolean("startTransaction", true))
108+
}
109+
cmd.Append(bson.EC.Boolean("autocommit", false))
110+
}
111+
94112
func addClusterTime(cmd *bson.Document, desc description.SelectedServer, sess *session.Client, clock *session.ClusterClock) error {
95113
if (clock == nil && sess == nil) || !description.SessionsSupported(desc.WireVersion) {
96114
return nil
@@ -122,6 +140,16 @@ func addClusterTime(cmd *bson.Document, desc description.SelectedServer, sess *s
122140

123141
// add a read concern to a BSON doc representing a command
124142
func addReadConcern(cmd *bson.Document, desc description.SelectedServer, rc *readconcern.ReadConcern, sess *session.Client) error {
143+
// Starting transaction's read concern overrides all others
144+
if sess != nil && sess.TransactionStarting() && sess.CurrentRc != nil {
145+
rc = sess.CurrentRc
146+
}
147+
148+
// start transaction must append afterclustertime IF causally consistent and operation time exists
149+
if rc == nil && sess != nil && sess.TransactionStarting() && sess.Consistent && sess.OperationTime != nil {
150+
rc = readconcern.New()
151+
}
152+
125153
if rc == nil {
126154
return nil
127155
}
@@ -168,6 +196,25 @@ func addWriteConcern(cmd *bson.Document, wc *writeconcern.WriteConcern) error {
168196
return nil
169197
}
170198

199+
// Get the error labels from a command response
200+
func getErrorLabels(rdr *bson.Reader) ([]string, error) {
201+
var labels []string
202+
labelsElem, err := rdr.Lookup("errorLabels")
203+
if err != bson.ErrElementNotFound {
204+
return nil, err
205+
}
206+
if labelsElem != nil {
207+
labelsIt, err := labelsElem.Value().ReaderArray().Iterator()
208+
if err != nil {
209+
return nil, err
210+
}
211+
for labelsIt.Next() {
212+
labels = append(labels, labelsIt.Element().Value().StringValue())
213+
}
214+
}
215+
return labels, nil
216+
}
217+
171218
// Remove command arguments for insert, update, and delete commands from the BSON document so they can be encoded
172219
// as a Section 1 payload in OP_MSG
173220
func opmsgRemoveArray(cmdDoc *bson.Document) (*bson.Array, string) {

core/command/commit_transaction.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package command
2+
3+
import (
4+
"context"
5+
6+
"github.com/mongodb/mongo-go-driver/bson"
7+
"github.com/mongodb/mongo-go-driver/core/description"
8+
"github.com/mongodb/mongo-go-driver/core/result"
9+
"github.com/mongodb/mongo-go-driver/core/session"
10+
"github.com/mongodb/mongo-go-driver/core/wiremessage"
11+
)
12+
13+
// CommitTransaction represents the commitTransaction() command
14+
type CommitTransaction struct {
15+
Session *session.Client
16+
err error
17+
result result.TransactionResult
18+
}
19+
20+
// Encode will encode this command into a wiremessage for the given server description.
21+
func (ct *CommitTransaction) Encode(desc description.SelectedServer) (wiremessage.WireMessage, error) {
22+
cmd := ct.encode(desc)
23+
return cmd.Encode(desc)
24+
}
25+
26+
func (ct *CommitTransaction) encode(desc description.SelectedServer) *Write {
27+
cmd := bson.NewDocument(bson.EC.Int32("commitTransaction", 1))
28+
return &Write{
29+
DB: "admin",
30+
Command: cmd,
31+
Session: ct.Session,
32+
WriteConcern: ct.Session.CurrentWc,
33+
}
34+
}
35+
36+
// Decode will decode the wire message using the provided server description. Errors during decoding are deferred until
37+
// either the Result or Err methods are called.
38+
func (ct *CommitTransaction) Decode(desc description.SelectedServer, wm wiremessage.WireMessage) *CommitTransaction {
39+
rdr, err := (&Write{}).Decode(desc, wm).Result()
40+
if err != nil {
41+
ct.err = err
42+
return ct
43+
}
44+
45+
return ct.decode(desc, rdr)
46+
}
47+
48+
func (ct *CommitTransaction) decode(desc description.SelectedServer, rdr bson.Reader) *CommitTransaction {
49+
ct.err = bson.Unmarshal(rdr, &ct.result)
50+
if ct.err == nil && ct.result.WriteConcernError != nil {
51+
ct.err = Error{
52+
Code: int32(ct.result.WriteConcernError.Code),
53+
Message: ct.result.WriteConcernError.ErrMsg,
54+
}
55+
}
56+
return ct
57+
}
58+
59+
// Result returns the result of a decoded wire message and server description.
60+
func (ct *CommitTransaction) Result() (result.TransactionResult, error) {
61+
if ct.err != nil {
62+
return result.TransactionResult{}, ct.err
63+
}
64+
65+
return ct.result, nil
66+
}
67+
68+
// Err returns the error set on this command
69+
func (ct *CommitTransaction) Err() error {
70+
return ct.err
71+
}
72+
73+
// RoundTrip handles the execution of this command using the provided wiremessage.ReadWriter
74+
func (ct *CommitTransaction) RoundTrip(ctx context.Context, desc description.SelectedServer, rw wiremessage.ReadWriter) (result.TransactionResult, error) {
75+
cmd := ct.encode(desc)
76+
rdr, err := cmd.RoundTrip(ctx, desc, rw)
77+
if err != nil {
78+
return result.TransactionResult{}, err
79+
}
80+
81+
return ct.decode(desc, rdr).Result()
82+
}

core/command/count_documents.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package command
99
import (
1010
"context"
1111
"errors"
12+
1213
"github.com/mongodb/mongo-go-driver/bson"
1314
"github.com/mongodb/mongo-go-driver/core/description"
1415
"github.com/mongodb/mongo-go-driver/core/option"

core/command/errors.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"errors"
1111
"fmt"
1212

13+
"strings"
14+
1315
"github.com/mongodb/mongo-go-driver/bson"
1416
)
1517

@@ -25,8 +27,18 @@ var (
2527
// ErrDocumentTooLarge occurs when a document that is larger than the maximum size accepted by a
2628
// server is passed to an insert command.
2729
ErrDocumentTooLarge = errors.New("an inserted document is too large")
30+
// ErrNonPrimaryRP occurs when a nonprimary read preference is used with a transaction.
31+
ErrNonPrimaryRP = errors.New("read preference in a transaction must be primary")
32+
// UnknownTransactionCommitResult is an error label for unknown transaction commit results.
33+
UnknownTransactionCommitResult = "UnknownTransactionCommitResult"
34+
// TransientTransactionError is an error label for transient errors with transactions.
35+
TransientTransactionError = "TransientTransactionError"
36+
// NetworkError is an error label for network errors.
37+
NetworkError = "NetworkError"
2838
)
2939

40+
var retryableCodes = []int32{11600, 11602, 10107, 13435, 13436, 189, 91, 7, 6, 89, 9001}
41+
3042
// QueryFailureError is an error representing a command failure as a document.
3143
type QueryFailureError struct {
3244
Message string
@@ -61,6 +73,7 @@ func (e ResponseError) Error() string {
6173
type Error struct {
6274
Code int32
6375
Message string
76+
Labels []string
6477
Name string
6578
}
6679

@@ -72,6 +85,37 @@ func (e Error) Error() string {
7285
return e.Message
7386
}
7487

88+
// HasErrorLabel returns true if the error contains the specified label.
89+
func (e Error) HasErrorLabel(label string) bool {
90+
if e.Labels != nil {
91+
for _, l := range e.Labels {
92+
if l == label {
93+
return true
94+
}
95+
}
96+
}
97+
return false
98+
}
99+
100+
// Retryable returns true if the error is retryable
101+
func (e Error) Retryable() bool {
102+
for _, label := range e.Labels {
103+
if label == NetworkError {
104+
return true
105+
}
106+
}
107+
for _, code := range retryableCodes {
108+
if e.Code == code {
109+
return true
110+
}
111+
}
112+
if strings.Contains(e.Message, "not master") || strings.Contains(e.Message, "node is recovering") {
113+
return true
114+
}
115+
116+
return false
117+
}
118+
75119
// IsNotFound indicates if the error is from a namespace not being found.
76120
func IsNotFound(err error) bool {
77121
e, ok := err.(Error)

core/command/find.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,14 @@ func (f *Find) decode(desc description.SelectedServer, cb CursorBuilder, rdr bso
115115
opts = append(opts, curOpt)
116116
}
117117

118-
f.result, f.err = cb.BuildCursor(rdr, f.Session, f.Clock, opts...)
118+
labels, err := getErrorLabels(&rdr)
119+
f.err = err
120+
121+
res, err := cb.BuildCursor(rdr, f.Session, f.Clock, opts...)
122+
f.result = res
123+
if err != nil {
124+
f.err = Error{Message: err.Error(), Labels: labels}
125+
}
119126
return f
120127
}
121128

core/command/list_collections.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,14 @@ func (lc *ListCollections) decode(desc description.SelectedServer, cb CursorBuil
8989
opts = append(opts, curOpt)
9090
}
9191

92-
lc.result, lc.err = cb.BuildCursor(rdr, lc.Session, lc.Clock, opts...)
92+
labels, err := getErrorLabels(&rdr)
93+
lc.err = err
94+
95+
res, err := cb.BuildCursor(rdr, lc.Session, lc.Clock, opts...)
96+
lc.result = res
97+
if err != nil {
98+
lc.err = Error{Message: err.Error(), Labels: labels}
99+
}
93100

94101
return lc
95102
}

core/command/list_indexes.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,15 @@ func (li *ListIndexes) decode(desc description.SelectedServer, cb CursorBuilder,
8585
opts = append(opts, curOpt)
8686
}
8787

88-
li.result, li.err = cb.BuildCursor(rdr, li.Session, li.Clock, opts...)
88+
labels, err := getErrorLabels(&rdr)
89+
li.err = err
90+
91+
res, err := cb.BuildCursor(rdr, li.Session, li.Clock, opts...)
92+
li.result = res
93+
if err != nil {
94+
li.err = Error{Message: err.Error(), Labels: labels}
95+
}
96+
8997
return li
9098
}
9199

0 commit comments

Comments
 (0)