Skip to content

Sessions Support #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,14 @@ tasks:
commands:
- func: bootstrap-mongo-orchestration
vars:
topology: "server"
auth: "auth"
ssl: "ssl"
TOPOLOGY: "server"
AUTH: "auth"
SSL: "ssl"
- func: run-tests
vars:
topology: "server"
auth: "auth"
ssl: "ssl"
TOPOLOGY: "server"
AUTH: "auth"
SSL: "ssl"
MONGO_GO_DRIVER_COMPRESSOR: "snappy"

- name: test-replicaset-noauth-nossl
Expand Down
8 changes: 6 additions & 2 deletions core/command/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/mongodb/mongo-go-driver/core/option"
"github.com/mongodb/mongo-go-driver/core/readconcern"
"github.com/mongodb/mongo-go-driver/core/readpref"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
"github.com/mongodb/mongo-go-driver/core/writeconcern"
)
Expand All @@ -28,6 +29,8 @@ type Aggregate struct {
ReadPref *readpref.ReadPref
WriteConcern *writeconcern.WriteConcern
ReadConcern *readconcern.ReadConcern
Clock *session.ClusterClock
Session *session.Client

result Cursor
err error
Expand Down Expand Up @@ -89,6 +92,8 @@ func (a *Aggregate) encode(desc description.SelectedServer) (*Read, error) {
Command: command,
ReadPref: a.ReadPref,
ReadConcern: a.ReadConcern,
Clock: a.Clock,
Session: a.Session,
}, nil
}

Expand Down Expand Up @@ -139,8 +144,7 @@ func (a *Aggregate) decode(desc description.SelectedServer, cb CursorBuilder, rd
opts = append(opts, curOpt)
}

a.result, a.err = cb.BuildCursor(rdr, opts...)

a.result, a.err = cb.BuildCursor(rdr, a.Session, a.Clock, opts...)
return a
}

Expand Down
81 changes: 81 additions & 0 deletions core/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,43 @@ import (
"errors"

"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/core/description"
"github.com/mongodb/mongo-go-driver/core/readconcern"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
"github.com/mongodb/mongo-go-driver/core/writeconcern"
)

func responseClusterTime(response bson.Reader) *bson.Document {
clusterTime, err := response.Lookup("$clusterTime")
if err != nil {
// $clusterTime not included by the server
return nil
}

return bson.NewDocument(clusterTime)
}

func updateClusterTimes(sess *session.Client, clock *session.ClusterClock, response bson.Reader) error {
clusterTime := responseClusterTime(response)
if clusterTime == nil {
return nil
}

if sess != nil {
err := sess.AdvanceClusterTime(clusterTime)
if err != nil {
return err
}
}

if clock != nil {
clock.AdvanceClusterTime(clusterTime)
}

return nil
}

func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
if cmd == nil {
return bson.Reader{5, 0, 0, 0, 0}, nil
Expand All @@ -23,6 +55,54 @@ func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
return cmd.MarshalBSON()
}

// add a session ID to a BSON doc representing a command
func addSessionID(cmd *bson.Document, desc description.SelectedServer, client *session.Client) error {
if client == nil || !description.SessionsSupported(desc.WireVersion) || desc.SessionTimeoutMinutes == 0 {
return nil
}

if client.Terminated {
return session.ErrSessionEnded
}

if _, err := cmd.LookupElementErr("lsid"); err != nil {
cmd.Delete("lsid")
}

cmd.Append(bson.EC.SubDocument("lsid", client.SessionID))
return nil
}

func addClusterTime(cmd *bson.Document, desc description.SelectedServer, sess *session.Client, clock *session.ClusterClock) error {
if (clock == nil && sess == nil) || !description.SessionsSupported(desc.WireVersion) {
return nil
}

var clusterTime *bson.Document
if clock != nil {
clusterTime = clock.GetClusterTime()
}

if sess != nil {
if clusterTime == nil {
clusterTime = sess.ClusterTime
} else {
clusterTime = session.MaxClusterTime(clusterTime, sess.ClusterTime)
}
}

if clusterTime == nil {
return nil
}

if _, err := cmd.LookupElementErr("$clusterTime"); err != nil {
cmd.Delete("$clusterTime")
}

return cmd.Concat(clusterTime)
}

// add a read concern to a BSON doc representing a command
func addReadConcern(cmd *bson.Document, rc *readconcern.ReadConcern) error {
if rc == nil {
return nil
Expand All @@ -41,6 +121,7 @@ func addReadConcern(cmd *bson.Document, rc *readconcern.ReadConcern) error {
return nil
}

// add a write concern to a BSON doc representing a command
func addWriteConcern(cmd *bson.Document, wc *writeconcern.WriteConcern) error {
if wc == nil {
return nil
Expand Down
5 changes: 5 additions & 0 deletions core/command/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/mongodb/mongo-go-driver/core/option"
"github.com/mongodb/mongo-go-driver/core/readconcern"
"github.com/mongodb/mongo-go-driver/core/readpref"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
)

Expand All @@ -27,6 +28,8 @@ type Count struct {
Opts []option.CountOptioner
ReadPref *readpref.ReadPref
ReadConcern *readconcern.ReadConcern
Clock *session.ClusterClock
Session *session.Client

result int64
err error
Expand Down Expand Up @@ -59,10 +62,12 @@ func (c *Count) encode(desc description.SelectedServer) (*Read, error) {
}

return &Read{
Clock: c.Clock,
DB: c.NS.DB,
ReadPref: c.ReadPref,
Command: command,
ReadConcern: c.ReadConcern,
Session: c.Session,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions core/command/create_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/mongodb/mongo-go-driver/core/description"
"github.com/mongodb/mongo-go-driver/core/option"
"github.com/mongodb/mongo-go-driver/core/result"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
"github.com/mongodb/mongo-go-driver/core/writeconcern"
)
Expand All @@ -25,6 +26,8 @@ type CreateIndexes struct {
Indexes *bson.Array
Opts []option.CreateIndexesOptioner
WriteConcern *writeconcern.WriteConcern
Clock *session.ClusterClock
Session *session.Client

result result.CreateIndexes
err error
Expand Down Expand Up @@ -57,9 +60,11 @@ func (ci *CreateIndexes) encode(desc description.SelectedServer) (*Write, error)
}

return &Write{
Clock: ci.Clock,
DB: ci.NS.DB,
Command: cmd,
WriteConcern: ci.WriteConcern,
Session: ci.Session,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion core/command/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/core/option"
"github.com/mongodb/mongo-go-driver/core/session"
)

// Cursor instances iterate a stream of documents. Each document is
Expand Down Expand Up @@ -60,7 +61,7 @@ type Cursor interface {

// CursorBuilder is a type that can build a Cursor.
type CursorBuilder interface {
BuildCursor(bson.Reader, ...option.CursorOptioner) (Cursor, error)
BuildCursor(bson.Reader, *session.Client, *session.ClusterClock, ...option.CursorOptioner) (Cursor, error)
}

type emptyCursor struct{}
Expand Down
5 changes: 5 additions & 0 deletions core/command/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/mongodb/mongo-go-driver/core/description"
"github.com/mongodb/mongo-go-driver/core/option"
"github.com/mongodb/mongo-go-driver/core/result"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
"github.com/mongodb/mongo-go-driver/core/writeconcern"
)
Expand All @@ -26,6 +27,8 @@ type Delete struct {
Deletes []*bson.Document
Opts []option.DeleteOptioner
WriteConcern *writeconcern.WriteConcern
Clock *session.ClusterClock
Session *session.Client

result result.Delete
err error
Expand Down Expand Up @@ -73,9 +76,11 @@ func (d *Delete) encode(desc description.SelectedServer) (*Write, error) {
}

return &Write{
Clock: d.Clock,
DB: d.NS.DB,
Command: command,
WriteConcern: d.WriteConcern,
Session: d.Session,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions core/command/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/mongodb/mongo-go-driver/core/readconcern"
"github.com/mongodb/mongo-go-driver/core/readpref"
"github.com/mongodb/mongo-go-driver/core/result"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
)

Expand All @@ -29,6 +30,8 @@ type Distinct struct {
Opts []option.DistinctOptioner
ReadPref *readpref.ReadPref
ReadConcern *readconcern.ReadConcern
Clock *session.ClusterClock
Session *session.Client

result result.Distinct
err error
Expand Down Expand Up @@ -67,10 +70,12 @@ func (d *Distinct) encode(desc description.SelectedServer) (*Read, error) {
}

return &Read{
Clock: d.Clock,
DB: d.NS.DB,
ReadPref: d.ReadPref,
Command: command,
ReadConcern: d.ReadConcern,
Session: d.Session,
}, nil
}

Expand Down
22 changes: 19 additions & 3 deletions core/command/drop_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/core/description"
"github.com/mongodb/mongo-go-driver/core/session"
"github.com/mongodb/mongo-go-driver/core/wiremessage"
"github.com/mongodb/mongo-go-driver/core/writeconcern"
)
Expand All @@ -22,6 +23,8 @@ type DropCollection struct {
DB string
Collection string
WriteConcern *writeconcern.WriteConcern
Clock *session.ClusterClock
Session *session.Client

result bson.Reader
err error
Expand All @@ -43,16 +46,28 @@ func (dc *DropCollection) encode(desc description.SelectedServer) (*Write, error
)

return &Write{
Clock: dc.Clock,
WriteConcern: dc.WriteConcern,
DB: dc.DB,
Command: cmd,
Session: dc.Session,
}, nil
}

// Decode will decode the wire message using the provided server description. Errors during decoding
// are deferred until either the Result or Err methods are called.
func (dc *DropCollection) Decode(desc description.SelectedServer, wm wiremessage.WireMessage) *DropCollection {
dc.result, dc.err = (&Write{}).Decode(desc, wm).Result()
rdr, err := (&Write{}).Decode(desc, wm).Result()
if err != nil {
dc.err = err
return dc
}

return dc.decode(desc, rdr)
}

func (dc *DropCollection) decode(desc description.SelectedServer, rdr bson.Reader) *DropCollection {
dc.result = rdr
return dc
}

Expand All @@ -61,6 +76,7 @@ func (dc *DropCollection) Result() (bson.Reader, error) {
if dc.err != nil {
return nil, dc.err
}

return dc.result, nil
}

Expand All @@ -74,10 +90,10 @@ func (dc *DropCollection) RoundTrip(ctx context.Context, desc description.Select
return nil, err
}

dc.result, err = cmd.RoundTrip(ctx, desc, rw)
rdr, err := cmd.RoundTrip(ctx, desc, rw)
if err != nil {
return nil, err
}

return dc.Result()
return dc.decode(desc, rdr).Result()
}
Loading