Skip to content

Commit 88e57ff

Browse files
author
Divjot Arora
committed
Implement the OP_MSG specification.
GODRIVER-54 GODRIVER-482 GODRIVER-483 Change-Id: Ida787c1005d17f0a8bae5993cbc5e72665e8daae
1 parent fff8c98 commit 88e57ff

File tree

102 files changed

+2474
-1556
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+2474
-1556
lines changed

.errcheck-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(*github.com/mongodb/mongo-go-driver/core/topology.Subscription).Unsubscribe
44
(*github.com/mongodb/mongo-go-driver/core/topology.Server).Close
55
(*github.com/mongodb/mongo-go-driver/core/connection.pool).closeConnection
6+
(github.com/mongodb/mongo-go-driver/core/wiremessage.ReadWriteCloser).Close
67
(net.Conn).Close
78
encoding/pem.Encode
89
fmt.Fprintf

bson/document.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,24 @@ func ReadDocument(b []byte) (*Document, error) {
9090
return doc, nil
9191
}
9292

93+
// Copy makes a shallow copy of this document.
94+
func (d *Document) Copy() *Document {
95+
if d == nil {
96+
return nil
97+
}
98+
99+
doc := &Document{
100+
IgnoreNilInsert: d.IgnoreNilInsert,
101+
elems: make([]*Element, len(d.elems), cap(d.elems)),
102+
index: make([]uint32, len(d.index), cap(d.index)),
103+
}
104+
105+
copy(doc.elems, d.elems)
106+
copy(doc.index, d.index)
107+
108+
return doc
109+
}
110+
93111
// Len returns the number of elements in the document.
94112
func (d *Document) Len() int {
95113
if d == nil {

core/auth/auth_test.go

Lines changed: 17 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
package auth_test
88

99
import (
10-
"context"
1110
"testing"
1211

12+
"reflect"
13+
1314
"github.com/mongodb/mongo-go-driver/bson"
1415
. "github.com/mongodb/mongo-go-driver/core/auth"
1516
"github.com/mongodb/mongo-go-driver/core/wiremessage"
@@ -45,57 +46,21 @@ func TestCreateAuthenticator(t *testing.T) {
4546
}
4647
}
4748

48-
type conn struct {
49-
t *testing.T
50-
writeErr error
51-
written chan wiremessage.WireMessage
52-
readResp chan wiremessage.WireMessage
53-
readErr chan error
54-
}
55-
56-
func (c *conn) WriteWireMessage(ctx context.Context, wm wiremessage.WireMessage) error {
57-
select {
58-
case c.written <- wm:
59-
default:
60-
c.t.Error("could not write wiremessage to written channel")
61-
}
62-
return c.writeErr
63-
}
49+
func compareResponses(t *testing.T, wm wiremessage.WireMessage, expectedPayload *bson.Document, dbName string) {
50+
switch converted := wm.(type) {
51+
case wiremessage.Query:
52+
payloadBytes, err := expectedPayload.MarshalBSON()
53+
if err != nil {
54+
t.Fatalf("couldn't marshal query bson: %v", err)
55+
}
56+
require.True(t, reflect.DeepEqual([]byte(converted.Query), payloadBytes))
57+
case wiremessage.Msg:
58+
msgPayload := expectedPayload.Append(bson.EC.String("$db", dbName))
59+
payloadBytes, err := msgPayload.MarshalBSON()
60+
if err != nil {
61+
t.Fatalf("couldn't marshal msg bson: %v", err)
62+
}
6463

65-
func (c *conn) ReadWireMessage(ctx context.Context) (wiremessage.WireMessage, error) {
66-
var wm wiremessage.WireMessage
67-
var err error
68-
select {
69-
case wm = <-c.readResp:
70-
case err = <-c.readErr:
71-
case <-ctx.Done():
72-
}
73-
return wm, err
74-
}
75-
76-
func (c *conn) Close() error {
77-
return nil
78-
}
79-
80-
func (c *conn) Expired() bool {
81-
return false
82-
}
83-
84-
func (c *conn) Alive() bool {
85-
return true
86-
}
87-
88-
func (c *conn) ID() string {
89-
return "faked"
90-
}
91-
92-
func makeReply(t *testing.T, doc *bson.Document) wiremessage.WireMessage {
93-
rdr, err := doc.MarshalBSON()
94-
if err != nil {
95-
t.Fatalf("Could not create document: %v", err)
96-
}
97-
return wiremessage.Reply{
98-
NumberReturned: 1,
99-
Documents: []bson.Reader{rdr},
64+
require.True(t, reflect.DeepEqual([]byte(converted.Sections[0].(wiremessage.SectionBody).Document), payloadBytes))
10065
}
10166
}

core/auth/mongodbcr.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (a *MongoDBCRAuthenticator) Auth(ctx context.Context, desc description.Serv
5656
db = defaultAuthDB
5757
}
5858

59-
cmd := command.Command{DB: db, Command: bson.NewDocument(bson.EC.Int32("getnonce", 1))}
59+
cmd := command.Read{DB: db, Command: bson.NewDocument(bson.EC.Int32("getnonce", 1))}
6060
ssdesc := description.SelectedServer{Server: desc}
6161
rdr, err := cmd.RoundTrip(ctx, ssdesc, rw)
6262
if err != nil {
@@ -72,7 +72,7 @@ func (a *MongoDBCRAuthenticator) Auth(ctx context.Context, desc description.Serv
7272
return newAuthError("unmarshal error", err)
7373
}
7474

75-
cmd = command.Command{
75+
cmd = command.Read{
7676
DB: db,
7777
Command: bson.NewDocument(
7878
bson.EC.Int32("authenticate", 1),

core/auth/mongodbcr_test.go

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ import (
1010
"context"
1111
"testing"
1212

13-
"reflect"
14-
1513
"strings"
1614

1715
"github.com/mongodb/mongo-go-driver/bson"
1816
. "github.com/mongodb/mongo-go-driver/core/auth"
1917
"github.com/mongodb/mongo-go-driver/core/description"
2018
"github.com/mongodb/mongo-go-driver/core/wiremessage"
19+
"github.com/mongodb/mongo-go-driver/internal"
2120
)
2221

2322
func TestMongoDBCRAuthenticator_Fails(t *testing.T) {
@@ -30,16 +29,20 @@ func TestMongoDBCRAuthenticator_Fails(t *testing.T) {
3029
}
3130

3231
resps := make(chan wiremessage.WireMessage, 2)
33-
resps <- makeReply(t, bson.NewDocument(
32+
resps <- internal.MakeReply(t, bson.NewDocument(
3433
bson.EC.Int32("ok", 1),
3534
bson.EC.String("nonce", "2375531c32080ae8"),
3635
))
3736

38-
resps <- makeReply(t, bson.NewDocument(bson.EC.Int32("ok", 0)))
37+
resps <- internal.MakeReply(t, bson.NewDocument(bson.EC.Int32("ok", 0)))
3938

40-
c := &conn{written: make(chan wiremessage.WireMessage, 2), readResp: resps}
39+
c := &internal.ChannelConn{Written: make(chan wiremessage.WireMessage, 2), ReadResp: resps}
4140

42-
err := authenticator.Auth(context.Background(), description.Server{}, c)
41+
err := authenticator.Auth(context.Background(), description.Server{
42+
WireVersion: &description.VersionRange{
43+
Max: 6,
44+
},
45+
}, c)
4346
if err == nil {
4447
t.Fatalf("expected an error but got none")
4548
}
@@ -61,47 +64,36 @@ func TestMongoDBCRAuthenticator_Succeeds(t *testing.T) {
6164

6265
resps := make(chan wiremessage.WireMessage, 2)
6366

64-
resps <- makeReply(t, bson.NewDocument(
67+
resps <- internal.MakeReply(t, bson.NewDocument(
6568
bson.EC.Int32("ok", 1),
6669
bson.EC.String("nonce", "2375531c32080ae8"),
6770
))
6871

69-
resps <- makeReply(t, bson.NewDocument(bson.EC.Int32("ok", 1)))
72+
resps <- internal.MakeReply(t, bson.NewDocument(bson.EC.Int32("ok", 1)))
7073

71-
c := &conn{written: make(chan wiremessage.WireMessage, 2), readResp: resps}
74+
c := &internal.ChannelConn{Written: make(chan wiremessage.WireMessage, 2), ReadResp: resps}
7275

73-
err := authenticator.Auth(context.Background(), description.Server{}, c)
76+
err := authenticator.Auth(context.Background(), description.Server{
77+
WireVersion: &description.VersionRange{
78+
Max: 6,
79+
},
80+
}, c)
7481
if err != nil {
7582
t.Fatalf("expected no error but got \"%s\"", err)
7683
}
7784

78-
if len(c.written) != 2 {
79-
t.Fatalf("expected 2 messages to be sent but had %d", len(c.written))
85+
if len(c.Written) != 2 {
86+
t.Fatalf("expected 2 messages to be sent but had %d", len(c.Written))
8087
}
8188

82-
getNonceRequest := (<-c.written).(wiremessage.Query)
83-
var want bson.Reader
84-
want, err = bson.NewDocument(bson.EC.Int32("getnonce", 1)).MarshalBSON()
85-
if err != nil {
86-
t.Fatalf("couldn't marshal bson: %v", err)
87-
}
88-
if !reflect.DeepEqual(getNonceRequest.Query, want) {
89-
t.Fatalf("getnonce command was incorrect: %v", getNonceRequest.Query)
90-
}
89+
want := bson.NewDocument(bson.EC.Int32("getnonce", 1))
90+
compareResponses(t, <-c.Written, want, "source")
9191

92-
authenticateRequest := (<-c.written).(wiremessage.Query)
93-
var expectedAuthenticateDoc bson.Reader
94-
expectedAuthenticateDoc, err = bson.NewDocument(
92+
expectedAuthenticateDoc := bson.NewDocument(
9593
bson.EC.Int32("authenticate", 1),
9694
bson.EC.String("user", "user"),
9795
bson.EC.String("nonce", "2375531c32080ae8"),
9896
bson.EC.String("key", "21742f26431831d5cfca035a08c5bdf6"),
99-
).MarshalBSON()
100-
if err != nil {
101-
t.Fatalf("couldn't marshal bson: %v", err)
102-
}
103-
104-
if !reflect.DeepEqual(authenticateRequest.Query, expectedAuthenticateDoc) {
105-
t.Fatalf("authenticate command was incorrect: %v", authenticateRequest.Query)
106-
}
97+
)
98+
compareResponses(t, <-c.Written, expectedAuthenticateDoc, "source")
10799
}

core/auth/plain_test.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ import (
1111
"strings"
1212
"testing"
1313

14-
"reflect"
15-
1614
"encoding/base64"
1715

1816
"github.com/mongodb/mongo-go-driver/bson"
1917
. "github.com/mongodb/mongo-go-driver/core/auth"
2018
"github.com/mongodb/mongo-go-driver/core/description"
2119
"github.com/mongodb/mongo-go-driver/core/wiremessage"
20+
"github.com/mongodb/mongo-go-driver/internal"
2221
)
2322

2423
func TestPlainAuthenticator_Fails(t *testing.T) {
@@ -30,17 +29,21 @@ func TestPlainAuthenticator_Fails(t *testing.T) {
3029
}
3130

3231
resps := make(chan wiremessage.WireMessage, 1)
33-
resps <- makeReply(t, bson.NewDocument(
32+
resps <- internal.MakeReply(t, bson.NewDocument(
3433
bson.EC.Int32("ok", 1),
3534
bson.EC.Int32("conversationId", 1),
3635
bson.EC.Binary("payload", []byte{}),
3736
bson.EC.Int32("code", 143),
3837
bson.EC.Boolean("done", true)),
3938
)
4039

41-
c := &conn{written: make(chan wiremessage.WireMessage, 1), readResp: resps}
40+
c := &internal.ChannelConn{Written: make(chan wiremessage.WireMessage, 1), ReadResp: resps}
4241

43-
err := authenticator.Auth(context.Background(), description.Server{}, c)
42+
err := authenticator.Auth(context.Background(), description.Server{
43+
WireVersion: &description.VersionRange{
44+
Max: 6,
45+
},
46+
}, c)
4447
if err == nil {
4548
t.Fatalf("expected an error but got none")
4649
}
@@ -60,22 +63,26 @@ func TestPlainAuthenticator_Extra_server_message(t *testing.T) {
6063
}
6164

6265
resps := make(chan wiremessage.WireMessage, 2)
63-
resps <- makeReply(t, bson.NewDocument(
66+
resps <- internal.MakeReply(t, bson.NewDocument(
6467
bson.EC.Int32("ok", 1),
6568
bson.EC.Int32("conversationId", 1),
6669
bson.EC.Binary("payload", []byte{}),
6770
bson.EC.Boolean("done", false)),
6871
)
69-
resps <- makeReply(t, bson.NewDocument(
72+
resps <- internal.MakeReply(t, bson.NewDocument(
7073
bson.EC.Int32("ok", 1),
7174
bson.EC.Int32("conversationId", 1),
7275
bson.EC.Binary("payload", []byte{}),
7376
bson.EC.Boolean("done", true)),
7477
)
7578

76-
c := &conn{written: make(chan wiremessage.WireMessage, 1), readResp: resps}
79+
c := &internal.ChannelConn{Written: make(chan wiremessage.WireMessage, 1), ReadResp: resps}
7780

78-
err := authenticator.Auth(context.Background(), description.Server{}, c)
81+
err := authenticator.Auth(context.Background(), description.Server{
82+
WireVersion: &description.VersionRange{
83+
Max: 6,
84+
},
85+
}, c)
7986
if err == nil {
8087
t.Fatalf("expected an error but got none")
8188
}
@@ -95,36 +102,33 @@ func TestPlainAuthenticator_Succeeds(t *testing.T) {
95102
}
96103

97104
resps := make(chan wiremessage.WireMessage, 1)
98-
resps <- makeReply(t, bson.NewDocument(
105+
resps <- internal.MakeReply(t, bson.NewDocument(
99106
bson.EC.Int32("ok", 1),
100107
bson.EC.Int32("conversationId", 1),
101108
bson.EC.Binary("payload", []byte{}),
102109
bson.EC.Boolean("done", true)),
103110
)
104111

105-
c := &conn{written: make(chan wiremessage.WireMessage, 1), readResp: resps}
112+
c := &internal.ChannelConn{Written: make(chan wiremessage.WireMessage, 1), ReadResp: resps}
106113

107-
err := authenticator.Auth(context.Background(), description.Server{}, c)
114+
err := authenticator.Auth(context.Background(), description.Server{
115+
WireVersion: &description.VersionRange{
116+
Max: 6,
117+
},
118+
}, c)
108119
if err != nil {
109120
t.Fatalf("expected no error but got \"%s\"", err)
110121
}
111122

112-
if len(c.written) != 1 {
113-
t.Fatalf("expected 1 messages to be sent but had %d", len(c.written))
123+
if len(c.Written) != 1 {
124+
t.Fatalf("expected 1 messages to be sent but had %d", len(c.Written))
114125
}
115126

116-
saslStartRequest := (<-c.written).(wiremessage.Query)
117127
payload, _ := base64.StdEncoding.DecodeString("AHVzZXIAcGVuY2ls")
118-
expectedCmd, err := bson.NewDocument(
128+
expectedCmd := bson.NewDocument(
119129
bson.EC.Int32("saslStart", 1),
120130
bson.EC.String("mechanism", "PLAIN"),
121131
bson.EC.Binary("payload", payload),
122-
).MarshalBSON()
123-
if err != nil {
124-
t.Fatalf("couldn't marshal bson: %v", err)
125-
}
126-
127-
if !reflect.DeepEqual(saslStartRequest.Query, bson.Reader(expectedCmd)) {
128-
t.Fatalf("saslStart command was incorrect. got %v; want %v", saslStartRequest.Query, bson.Reader(expectedCmd))
129-
}
132+
)
133+
compareResponses(t, <-c.Written, expectedCmd, "$external")
130134
}

core/auth/sasl.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type SaslClientCloser interface {
3030

3131
// ConductSaslConversation handles running a sasl conversation with MongoDB.
3232
func ConductSaslConversation(ctx context.Context, desc description.Server, rw wiremessage.ReadWriter, db string, client SaslClient) error {
33-
3433
// Arbiters cannot be authenticated
3534
if desc.Kind == description.RSArbiter {
3635
return nil
@@ -49,7 +48,7 @@ func ConductSaslConversation(ctx context.Context, desc description.Server, rw wi
4948
return newError(err, mech)
5049
}
5150

52-
saslStartCmd := command.Command{
51+
saslStartCmd := command.Read{
5352
DB: db,
5453
Command: bson.NewDocument(
5554
bson.EC.Int32("saslStart", 1),
@@ -98,7 +97,7 @@ func ConductSaslConversation(ctx context.Context, desc description.Server, rw wi
9897
return nil
9998
}
10099

101-
saslContinueCmd := command.Command{
100+
saslContinueCmd := command.Read{
102101
DB: db,
103102
Command: bson.NewDocument(
104103
bson.EC.Int32("saslContinue", 1),

0 commit comments

Comments
 (0)