Skip to content

Commit 3d8db41

Browse files
Divjot Arorarfblue2
authored andcommitted
Implement sessions in core
GODRIVER-52 Change-Id: Ib3d60f531955a866f9b2c3fb58c5d12caff5681f
1 parent ecbcd69 commit 3d8db41

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+1879
-152
lines changed

core/command/aggregate.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/mongodb/mongo-go-driver/core/option"
1515
"github.com/mongodb/mongo-go-driver/core/readconcern"
1616
"github.com/mongodb/mongo-go-driver/core/readpref"
17+
"github.com/mongodb/mongo-go-driver/core/session"
1718
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1819
"github.com/mongodb/mongo-go-driver/core/writeconcern"
1920
)
@@ -28,6 +29,8 @@ type Aggregate struct {
2829
ReadPref *readpref.ReadPref
2930
WriteConcern *writeconcern.WriteConcern
3031
ReadConcern *readconcern.ReadConcern
32+
Clock *session.ClusterClock
33+
Session *session.Client
3134

3235
result Cursor
3336
err error
@@ -89,6 +92,8 @@ func (a *Aggregate) encode(desc description.SelectedServer) (*Read, error) {
8992
Command: command,
9093
ReadPref: a.ReadPref,
9194
ReadConcern: a.ReadConcern,
95+
Clock: a.Clock,
96+
Session: a.Session,
9297
}, nil
9398
}
9499

@@ -139,8 +144,7 @@ func (a *Aggregate) decode(desc description.SelectedServer, cb CursorBuilder, rd
139144
opts = append(opts, curOpt)
140145
}
141146

142-
a.result, a.err = cb.BuildCursor(rdr, opts...)
143-
147+
a.result, a.err = cb.BuildCursor(rdr, a.Session, a.Clock, opts...)
144148
return a
145149
}
146150

core/command/command.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,43 @@ import (
1010
"errors"
1111

1212
"github.com/mongodb/mongo-go-driver/bson"
13+
"github.com/mongodb/mongo-go-driver/core/description"
1314
"github.com/mongodb/mongo-go-driver/core/readconcern"
15+
"github.com/mongodb/mongo-go-driver/core/session"
1416
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1517
"github.com/mongodb/mongo-go-driver/core/writeconcern"
1618
)
1719

20+
func responseClusterTime(response bson.Reader) *bson.Document {
21+
clusterTime, err := response.Lookup("$clusterTime")
22+
if err != nil {
23+
// $clusterTime not included by the server
24+
return nil
25+
}
26+
27+
return bson.NewDocument(clusterTime)
28+
}
29+
30+
func updateClusterTimes(sess *session.Client, clock *session.ClusterClock, response bson.Reader) error {
31+
clusterTime := responseClusterTime(response)
32+
if clusterTime == nil {
33+
return nil
34+
}
35+
36+
if sess != nil {
37+
err := sess.AdvanceClusterTime(clusterTime)
38+
if err != nil {
39+
return err
40+
}
41+
}
42+
43+
if clock != nil {
44+
clock.AdvanceClusterTime(clusterTime)
45+
}
46+
47+
return nil
48+
}
49+
1850
func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
1951
if cmd == nil {
2052
return bson.Reader{5, 0, 0, 0, 0}, nil
@@ -23,6 +55,54 @@ func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
2355
return cmd.MarshalBSON()
2456
}
2557

58+
// add a session ID to a BSON doc representing a command
59+
func addSessionID(cmd *bson.Document, desc description.SelectedServer, client *session.Client) error {
60+
if client == nil || !description.SessionsSupported(desc.WireVersion) || desc.SessionTimeoutMinutes == 0 {
61+
return nil
62+
}
63+
64+
if client.Terminated {
65+
return session.ErrSessionEnded
66+
}
67+
68+
if _, err := cmd.LookupElementErr("lsid"); err != nil {
69+
cmd.Delete("lsid")
70+
}
71+
72+
cmd.Append(bson.EC.SubDocument("lsid", client.SessionID))
73+
return nil
74+
}
75+
76+
func addClusterTime(cmd *bson.Document, desc description.SelectedServer, sess *session.Client, clock *session.ClusterClock) error {
77+
if (clock == nil && sess == nil) || !description.SessionsSupported(desc.WireVersion) {
78+
return nil
79+
}
80+
81+
var clusterTime *bson.Document
82+
if clock != nil {
83+
clusterTime = clock.GetClusterTime()
84+
}
85+
86+
if sess != nil {
87+
if clusterTime == nil {
88+
clusterTime = sess.ClusterTime
89+
} else {
90+
clusterTime = session.MaxClusterTime(clusterTime, sess.ClusterTime)
91+
}
92+
}
93+
94+
if clusterTime == nil {
95+
return nil
96+
}
97+
98+
if _, err := cmd.LookupElementErr("$clusterTime"); err != nil {
99+
cmd.Delete("$clusterTime")
100+
}
101+
102+
return cmd.Concat(clusterTime)
103+
}
104+
105+
// add a read concern to a BSON doc representing a command
26106
func addReadConcern(cmd *bson.Document, rc *readconcern.ReadConcern) error {
27107
if rc == nil {
28108
return nil
@@ -41,6 +121,7 @@ func addReadConcern(cmd *bson.Document, rc *readconcern.ReadConcern) error {
41121
return nil
42122
}
43123

124+
// add a write concern to a BSON doc representing a command
44125
func addWriteConcern(cmd *bson.Document, wc *writeconcern.WriteConcern) error {
45126
if wc == nil {
46127
return nil

core/command/count.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/mongodb/mongo-go-driver/core/option"
1616
"github.com/mongodb/mongo-go-driver/core/readconcern"
1717
"github.com/mongodb/mongo-go-driver/core/readpref"
18+
"github.com/mongodb/mongo-go-driver/core/session"
1819
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1920
)
2021

@@ -27,6 +28,8 @@ type Count struct {
2728
Opts []option.CountOptioner
2829
ReadPref *readpref.ReadPref
2930
ReadConcern *readconcern.ReadConcern
31+
Clock *session.ClusterClock
32+
Session *session.Client
3033

3134
result int64
3235
err error
@@ -59,10 +62,12 @@ func (c *Count) encode(desc description.SelectedServer) (*Read, error) {
5962
}
6063

6164
return &Read{
65+
Clock: c.Clock,
6266
DB: c.NS.DB,
6367
ReadPref: c.ReadPref,
6468
Command: command,
6569
ReadConcern: c.ReadConcern,
70+
Session: c.Session,
6671
}, nil
6772
}
6873

core/command/create_indexes.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/mongodb/mongo-go-driver/core/description"
1414
"github.com/mongodb/mongo-go-driver/core/option"
1515
"github.com/mongodb/mongo-go-driver/core/result"
16+
"github.com/mongodb/mongo-go-driver/core/session"
1617
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1718
"github.com/mongodb/mongo-go-driver/core/writeconcern"
1819
)
@@ -25,6 +26,8 @@ type CreateIndexes struct {
2526
Indexes *bson.Array
2627
Opts []option.CreateIndexesOptioner
2728
WriteConcern *writeconcern.WriteConcern
29+
Clock *session.ClusterClock
30+
Session *session.Client
2831

2932
result result.CreateIndexes
3033
err error
@@ -57,9 +60,11 @@ func (ci *CreateIndexes) encode(desc description.SelectedServer) (*Write, error)
5760
}
5861

5962
return &Write{
63+
Clock: ci.Clock,
6064
DB: ci.NS.DB,
6165
Command: cmd,
6266
WriteConcern: ci.WriteConcern,
67+
Session: ci.Session,
6368
}, nil
6469
}
6570

core/command/cursor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/mongodb/mongo-go-driver/bson"
1313
"github.com/mongodb/mongo-go-driver/core/option"
14+
"github.com/mongodb/mongo-go-driver/core/session"
1415
)
1516

1617
// Cursor instances iterate a stream of documents. Each document is
@@ -60,7 +61,7 @@ type Cursor interface {
6061

6162
// CursorBuilder is a type that can build a Cursor.
6263
type CursorBuilder interface {
63-
BuildCursor(bson.Reader, ...option.CursorOptioner) (Cursor, error)
64+
BuildCursor(bson.Reader, *session.Client, *session.ClusterClock, ...option.CursorOptioner) (Cursor, error)
6465
}
6566

6667
type emptyCursor struct{}

core/command/delete.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/mongodb/mongo-go-driver/core/description"
1414
"github.com/mongodb/mongo-go-driver/core/option"
1515
"github.com/mongodb/mongo-go-driver/core/result"
16+
"github.com/mongodb/mongo-go-driver/core/session"
1617
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1718
"github.com/mongodb/mongo-go-driver/core/writeconcern"
1819
)
@@ -26,6 +27,8 @@ type Delete struct {
2627
Deletes []*bson.Document
2728
Opts []option.DeleteOptioner
2829
WriteConcern *writeconcern.WriteConcern
30+
Clock *session.ClusterClock
31+
Session *session.Client
2932

3033
result result.Delete
3134
err error
@@ -73,9 +76,11 @@ func (d *Delete) encode(desc description.SelectedServer) (*Write, error) {
7376
}
7477

7578
return &Write{
79+
Clock: d.Clock,
7680
DB: d.NS.DB,
7781
Command: command,
7882
WriteConcern: d.WriteConcern,
83+
Session: d.Session,
7984
}, nil
8085
}
8186

core/command/distinct.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/mongodb/mongo-go-driver/core/readconcern"
1616
"github.com/mongodb/mongo-go-driver/core/readpref"
1717
"github.com/mongodb/mongo-go-driver/core/result"
18+
"github.com/mongodb/mongo-go-driver/core/session"
1819
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1920
)
2021

@@ -29,6 +30,8 @@ type Distinct struct {
2930
Opts []option.DistinctOptioner
3031
ReadPref *readpref.ReadPref
3132
ReadConcern *readconcern.ReadConcern
33+
Clock *session.ClusterClock
34+
Session *session.Client
3235

3336
result result.Distinct
3437
err error
@@ -67,10 +70,12 @@ func (d *Distinct) encode(desc description.SelectedServer) (*Read, error) {
6770
}
6871

6972
return &Read{
73+
Clock: d.Clock,
7074
DB: d.NS.DB,
7175
ReadPref: d.ReadPref,
7276
Command: command,
7377
ReadConcern: d.ReadConcern,
78+
Session: d.Session,
7479
}, nil
7580
}
7681

core/command/drop_collection.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/mongodb/mongo-go-driver/bson"
1313
"github.com/mongodb/mongo-go-driver/core/description"
14+
"github.com/mongodb/mongo-go-driver/core/session"
1415
"github.com/mongodb/mongo-go-driver/core/wiremessage"
1516
"github.com/mongodb/mongo-go-driver/core/writeconcern"
1617
)
@@ -22,6 +23,8 @@ type DropCollection struct {
2223
DB string
2324
Collection string
2425
WriteConcern *writeconcern.WriteConcern
26+
Clock *session.ClusterClock
27+
Session *session.Client
2528

2629
result bson.Reader
2730
err error
@@ -43,16 +46,28 @@ func (dc *DropCollection) encode(desc description.SelectedServer) (*Write, error
4346
)
4447

4548
return &Write{
49+
Clock: dc.Clock,
4650
WriteConcern: dc.WriteConcern,
4751
DB: dc.DB,
4852
Command: cmd,
53+
Session: dc.Session,
4954
}, nil
5055
}
5156

5257
// Decode will decode the wire message using the provided server description. Errors during decoding
5358
// are deferred until either the Result or Err methods are called.
5459
func (dc *DropCollection) Decode(desc description.SelectedServer, wm wiremessage.WireMessage) *DropCollection {
55-
dc.result, dc.err = (&Write{}).Decode(desc, wm).Result()
60+
rdr, err := (&Write{}).Decode(desc, wm).Result()
61+
if err != nil {
62+
dc.err = err
63+
return dc
64+
}
65+
66+
return dc.decode(desc, rdr)
67+
}
68+
69+
func (dc *DropCollection) decode(desc description.SelectedServer, rdr bson.Reader) *DropCollection {
70+
dc.result = rdr
5671
return dc
5772
}
5873

@@ -61,6 +76,7 @@ func (dc *DropCollection) Result() (bson.Reader, error) {
6176
if dc.err != nil {
6277
return nil, dc.err
6378
}
79+
6480
return dc.result, nil
6581
}
6682

@@ -74,10 +90,10 @@ func (dc *DropCollection) RoundTrip(ctx context.Context, desc description.Select
7490
return nil, err
7591
}
7692

77-
dc.result, err = cmd.RoundTrip(ctx, desc, rw)
93+
rdr, err := cmd.RoundTrip(ctx, desc, rw)
7894
if err != nil {
7995
return nil, err
8096
}
8197

82-
return dc.Result()
98+
return dc.decode(desc, rdr).Result()
8399
}

0 commit comments

Comments
 (0)