Skip to content

Commit 0a84d58

Browse files
committed
Merge branch 'pr/79'
Change-Id: I0bce011cdc7d52009842a9aae97e2b1b03335ec1
2 parents cadedb1 + 31e4d1d commit 0a84d58

File tree

12 files changed

+939
-23
lines changed

12 files changed

+939
-23
lines changed

core/command/command.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,24 @@ func updateClusterTimes(sess *session.Client, clock *session.ClusterClock, respo
4747
return nil
4848
}
4949

50+
func updateOperationTime(sess *session.Client, response bson.Reader) error {
51+
if sess == nil {
52+
return nil
53+
}
54+
55+
opTimeElem, err := response.Lookup("operationTime")
56+
if err != nil {
57+
// operationTime not included by the server
58+
return nil
59+
}
60+
61+
t, i := opTimeElem.Value().Timestamp()
62+
return sess.AdvanceOperationTime(&bson.Timestamp{
63+
T: t,
64+
I: i,
65+
})
66+
}
67+
5068
func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
5169
if cmd == nil {
5270
return bson.Reader{5, 0, 0, 0, 0}, nil
@@ -103,7 +121,7 @@ func addClusterTime(cmd *bson.Document, desc description.SelectedServer, sess *s
103121
}
104122

105123
// add a read concern to a BSON doc representing a command
106-
func addReadConcern(cmd *bson.Document, rc *readconcern.ReadConcern) error {
124+
func addReadConcern(cmd *bson.Document, desc description.SelectedServer, rc *readconcern.ReadConcern, sess *session.Client) error {
107125
if rc == nil {
108126
return nil
109127
}
@@ -113,11 +131,20 @@ func addReadConcern(cmd *bson.Document, rc *readconcern.ReadConcern) error {
113131
return err
114132
}
115133

134+
rcDoc := element.Value().MutableDocument()
135+
if description.SessionsSupported(desc.WireVersion) && sess != nil && sess.Consistent && sess.OperationTime != nil {
136+
rcDoc = rcDoc.Append(
137+
bson.EC.Timestamp("afterClusterTime", sess.OperationTime.T, sess.OperationTime.I),
138+
)
139+
}
140+
116141
if _, err := cmd.LookupElementErr(element.Key()); err != nil {
117142
cmd.Delete(element.Key())
118143
}
119144

120-
cmd.Append(element)
145+
if rcDoc.Len() != 0 {
146+
cmd.Append(bson.EC.SubDocument("readConcern", rcDoc))
147+
}
121148
return nil
122149
}
123150

core/command/read.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (r *Read) decodeOpReply(wm wiremessage.WireMessage) {
187187
// Encode will encode this command into a wire message for the given server description.
188188
func (r *Read) Encode(desc description.SelectedServer) (wiremessage.WireMessage, error) {
189189
cmd := r.Command.Copy()
190-
err := addReadConcern(cmd, r.ReadConcern)
190+
err := addReadConcern(cmd, desc, r.ReadConcern, r.Session)
191191
if err != nil {
192192
return nil, err
193193
}
@@ -228,7 +228,7 @@ func (r *Read) Decode(desc description.SelectedServer, wm wiremessage.WireMessag
228228
}
229229

230230
_ = updateClusterTimes(r.Session, r.Clock, r.result)
231-
231+
_ = updateOperationTime(r.Session, r.result)
232232
return r
233233
}
234234

core/command/write.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ func (w *Write) Decode(desc description.SelectedServer, wm wiremessage.WireMessa
165165
}
166166

167167
_ = updateClusterTimes(w.Session, w.Clock, w.result)
168+
169+
if writeconcern.AckWrite(w.WriteConcern) {
170+
// don't update session operation time for unacknowledged write
171+
_ = updateOperationTime(w.Session, w.result)
172+
}
168173
return w
169174
}
170175

core/session/client_session.go

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ var ErrSessionEnded = errors.New("ended session was used")
1212

1313
// Client is a session for clients to run commands.
1414
type Client struct {
15-
ClientID uuid.UUID
16-
ClusterTime *bson.Document
17-
SessionID *bson.Document
18-
SessionType Type
19-
Terminated bool
15+
ClientID uuid.UUID
16+
ClusterTime *bson.Document
17+
Consistent bool // causal consistency
18+
OperationTime *bson.Timestamp
19+
SessionID *bson.Document
20+
SessionType Type
21+
Terminated bool
2022

2123
pool *Pool
2224
serverSession *Server
@@ -59,19 +61,31 @@ func MaxClusterTime(ct1 *bson.Document, ct2 *bson.Document) *bson.Document {
5961
}
6062

6163
// NewClientSession creates a Client.
62-
func NewClientSession(pool *Pool, clientID uuid.UUID, sessionType Type) (*Client, error) {
64+
func NewClientSession(pool *Pool, clientID uuid.UUID, sessionType Type, opts ...ClientOptioner) (*Client, error) {
65+
c := &Client{
66+
Consistent: true, // causal consistency defaults to true
67+
ClientID: clientID,
68+
SessionType: sessionType,
69+
pool: pool,
70+
}
71+
72+
var err error
73+
for _, opt := range opts {
74+
err = opt.Option(c)
75+
if err != nil {
76+
return nil, err
77+
}
78+
}
79+
6380
servSess, err := pool.GetSession()
6481
if err != nil {
6582
return nil, err
6683
}
6784

68-
return &Client{
69-
ClientID: clientID,
70-
SessionID: servSess.SessionID,
71-
SessionType: sessionType,
72-
pool: pool,
73-
serverSession: servSess,
74-
}, nil
85+
c.SessionID = servSess.SessionID
86+
c.serverSession = servSess
87+
88+
return c, nil
7589
}
7690

7791
// AdvanceClusterTime updates the session's cluster time.
@@ -83,6 +97,26 @@ func (c *Client) AdvanceClusterTime(clusterTime *bson.Document) error {
8397
return nil
8498
}
8599

100+
// AdvanceOperationTime updates the session's operation time.
101+
func (c *Client) AdvanceOperationTime(opTime *bson.Timestamp) error {
102+
if c.Terminated {
103+
return ErrSessionEnded
104+
}
105+
106+
if c.OperationTime == nil {
107+
c.OperationTime = opTime
108+
return nil
109+
}
110+
111+
if opTime.T > c.OperationTime.T {
112+
c.OperationTime = opTime
113+
} else if (opTime.T == c.OperationTime.T) && (opTime.I > c.OperationTime.I) {
114+
c.OperationTime = opTime
115+
}
116+
117+
return nil
118+
}
119+
86120
// UpdateUseTime updates the session's last used time.
87121
// Must be called whenver this session is used to send a command to the server.
88122
func (c *Client) UpdateUseTime() error {

core/session/client_session_test.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,20 @@ import (
55

66
"github.com/mongodb/mongo-go-driver/bson"
77
"github.com/mongodb/mongo-go-driver/core/uuid"
8+
"github.com/mongodb/mongo-go-driver/internal/testutil/helpers"
89
"github.com/stretchr/testify/require"
910
)
1011

12+
func compareOperationTimes(t *testing.T, expected *bson.Timestamp, actual *bson.Timestamp) {
13+
if expected.T != actual.T {
14+
t.Fatalf("T value mismatch; expected %d got %d", expected.T, actual.T)
15+
}
16+
17+
if expected.I != actual.I {
18+
t.Fatalf("I value mismatch; expected %d got %d", expected.I, actual.I)
19+
}
20+
}
21+
1122
func TestClientSession(t *testing.T) {
1223
var clusterTime1 = bson.NewDocument(bson.EC.SubDocument("$clusterTime",
1324
bson.NewDocument(bson.EC.Timestamp("clusterTime", 10, 5))))
@@ -30,7 +41,7 @@ func TestClientSession(t *testing.T) {
3041

3142
t.Run("TestAdvanceClusterTime", func(t *testing.T) {
3243
id, _ := uuid.New()
33-
sess, err := NewClientSession(&Pool{}, id, Explicit)
44+
sess, err := NewClientSession(&Pool{}, id, Explicit, OptCausalConsistency(true))
3445
require.Nil(t, err, "Unexpected error")
3546
err = sess.AdvanceClusterTime(clusterTime2)
3647
require.Nil(t, err, "Unexpected error")
@@ -52,10 +63,48 @@ func TestClientSession(t *testing.T) {
5263

5364
t.Run("TestEndSession", func(t *testing.T) {
5465
id, _ := uuid.New()
55-
sess, err := NewClientSession(&Pool{}, id, Explicit)
66+
sess, err := NewClientSession(&Pool{}, id, Explicit, OptCausalConsistency(true))
5667
require.Nil(t, err, "Unexpected error")
5768
sess.EndSession()
5869
err = sess.UpdateUseTime()
5970
require.NotNil(t, err, "Expected error, received nil")
6071
})
72+
73+
t.Run("TestAdvanceOperationTime", func(t *testing.T) {
74+
id, _ := uuid.New()
75+
sess, err := NewClientSession(&Pool{}, id, Explicit, OptCausalConsistency(true))
76+
require.Nil(t, err, "Unexpected error")
77+
78+
optime1 := &bson.Timestamp{
79+
T: 1,
80+
I: 0,
81+
}
82+
err = sess.AdvanceOperationTime(optime1)
83+
testhelpers.RequireNil(t, err, "error updating first operation time: %s", err)
84+
compareOperationTimes(t, optime1, sess.OperationTime)
85+
86+
optime2 := &bson.Timestamp{
87+
T: 2,
88+
I: 0,
89+
}
90+
err = sess.AdvanceOperationTime(optime2)
91+
testhelpers.RequireNil(t, err, "error updating second operation time: %s", err)
92+
compareOperationTimes(t, optime2, sess.OperationTime)
93+
94+
optime3 := &bson.Timestamp{
95+
T: 2,
96+
I: 1,
97+
}
98+
err = sess.AdvanceOperationTime(optime3)
99+
testhelpers.RequireNil(t, err, "error updating third operation time: %s", err)
100+
compareOperationTimes(t, optime3, sess.OperationTime)
101+
102+
err = sess.AdvanceOperationTime(&bson.Timestamp{
103+
T: 1,
104+
I: 10,
105+
})
106+
testhelpers.RequireNil(t, err, "error updating fourth operation time: %s", err)
107+
compareOperationTimes(t, optime3, sess.OperationTime)
108+
sess.EndSession()
109+
})
61110
}

core/session/options.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package session
2+
3+
// ClientOptioner is the interface implemented by types that can be used as options for configuring a client session.
4+
type ClientOptioner interface {
5+
Option(*Client) error
6+
}
7+
8+
// OptCausalConsistency specifies if a session should be causally consistent.
9+
type OptCausalConsistency bool
10+
11+
// Option implements the ClientOptioner interface.
12+
func (opt OptCausalConsistency) Option(c *Client) error {
13+
c.Consistent = bool(opt)
14+
return nil
15+
}

0 commit comments

Comments
 (0)