Skip to content

Commit f10249e

Browse files
author
Divjot Arora
committed
GODRIVER-1535 Fix session IDs batching in Disconnect
1 parent 16d2050 commit f10249e

File tree

2 files changed

+84
-22
lines changed

2 files changed

+84
-22
lines changed

mongo/client.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"context"
1111
"crypto/tls"
1212
"errors"
13-
"strconv"
1413
"strings"
1514
"time"
1615

@@ -34,11 +33,14 @@ import (
3433
)
3534

3635
const defaultLocalThreshold = 15 * time.Millisecond
37-
const batchSize = 10000
3836

39-
// keyVaultCollOpts specifies options used to communicate with the key vault collection
40-
var keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()).
41-
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
37+
var (
38+
// keyVaultCollOpts specifies options used to communicate with the key vault collection
39+
keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()).
40+
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
41+
42+
endSessionsBatchSize = 10000
43+
)
4244

4345
// Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by
4446
// multiple goroutines.
@@ -288,29 +290,27 @@ func (c *Client) endSessions(ctx context.Context) {
288290
return
289291
}
290292

291-
ids := c.sessionPool.IDSlice()
292-
idx, idArray := bsoncore.AppendArrayStart(nil)
293-
for i, id := range ids {
294-
idArray = bsoncore.AppendDocumentElement(idArray, strconv.Itoa(i), id)
295-
}
296-
idArray, _ = bsoncore.AppendArrayEnd(idArray, idx)
297-
298-
op := operation.NewEndSessions(idArray).ClusterClock(c.clock).Deployment(c.deployment).
293+
sessionIDs := c.sessionPool.IDSlice()
294+
op := operation.NewEndSessions(nil).ClusterClock(c.clock).Deployment(c.deployment).
299295
ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor).
300296
Database("admin").Crypt(c.crypt)
301297

302-
idx, idArray = bsoncore.AppendArrayStart(nil)
303-
totalNumIDs := len(ids)
298+
totalNumIDs := len(sessionIDs)
299+
var currentBatch []bsoncore.Document
304300
for i := 0; i < totalNumIDs; i++ {
305-
idArray = bsoncore.AppendDocumentElement(idArray, strconv.Itoa(i), ids[i])
306-
if ((i+1)%batchSize) == 0 || i == totalNumIDs-1 {
307-
idArray, _ = bsoncore.AppendArrayEnd(idArray, idx)
308-
_ = op.SessionIDs(idArray).Execute(ctx)
309-
idArray = idArray[:0]
310-
idx = 0
301+
currentBatch = append(currentBatch, sessionIDs[i])
302+
303+
// If we are at the end of a batch or the end of the overall IDs array, execute the operation.
304+
if ((i+1)%endSessionsBatchSize) == 0 || i == totalNumIDs-1 {
305+
// Ignore all errors when ending sessions.
306+
_, marshalVal, err := bson.MarshalValue(currentBatch)
307+
if err == nil {
308+
_ = op.SessionIDs(marshalVal).Execute(ctx)
309+
}
310+
311+
currentBatch = currentBatch[:0]
311312
}
312313
}
313-
314314
}
315315

316316
func (c *Client) configure(opts *options.ClientOptions) error {

mongo/client_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"time"
1414

1515
"go.mongodb.org/mongo-driver/bson"
16+
"go.mongodb.org/mongo-driver/event"
17+
"go.mongodb.org/mongo-driver/internal/testutil"
1618
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1719
"go.mongodb.org/mongo-driver/mongo/options"
1820
"go.mongodb.org/mongo-driver/mongo/readconcern"
@@ -257,4 +259,64 @@ func TestClient(t *testing.T) {
257259
assert.Equal(t, uri, got, "expected GetURI to return %v, got %v", uri, got)
258260
})
259261
})
262+
t.Run("endSessions", func(t *testing.T) {
263+
cs := testutil.ConnString(t)
264+
265+
var started []*event.CommandStartedEvent
266+
var failureReasons []string
267+
cmdMonitor := &event.CommandMonitor{
268+
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
269+
if evt.CommandName == "endSessions" {
270+
started = append(started, evt)
271+
}
272+
},
273+
Failed: func(_ context.Context, evt *event.CommandFailedEvent) {
274+
if evt.CommandName == "endSessions" {
275+
failureReasons = append(failureReasons, evt.Failure)
276+
}
277+
},
278+
}
279+
clientOpts := options.Client().ApplyURI(cs.Original).SetReadPreference(readpref.Primary()).
280+
SetWriteConcern(writeconcern.New(writeconcern.WMajority())).SetMonitor(cmdMonitor)
281+
client, err := Connect(bgCtx, clientOpts)
282+
assert.Nil(t, err, "Connect error: %v", err)
283+
coll := client.Database("foo").Collection("bar")
284+
285+
// Lower the batch size to force multiple batches.
286+
originalBatchSize := endSessionsBatchSize
287+
endSessionsBatchSize = 2
288+
defer func() {
289+
endSessionsBatchSize = originalBatchSize
290+
_ = coll.Drop(bgCtx)
291+
_ = client.Disconnect(bgCtx)
292+
}()
293+
294+
// Do an application operation and create four sessions so endSessions will execute in two batches.
295+
_, err = coll.CountDocuments(bgCtx, bson.D{})
296+
assert.Nil(t, err, "CountDocuments error: %v", err)
297+
numSessions := 4
298+
var sessions []Session
299+
for i := 0; i < numSessions; i++ {
300+
sess, err := client.StartSession()
301+
assert.Nil(t, err, "StartSession error at index %d: %v", i, err)
302+
sessions = append(sessions, sess)
303+
}
304+
for _, sess := range sessions {
305+
sess.EndSession(bgCtx)
306+
}
307+
308+
client.endSessions(bgCtx)
309+
numEventsExpected := numSessions / endSessionsBatchSize
310+
assert.Equal(t, len(started), numEventsExpected, "expected %d started events, got %d", numEventsExpected,
311+
len(started))
312+
assert.Equal(t, len(failureReasons), 0, "endSessions errors: %v", failureReasons)
313+
314+
for i := 0; i < numEventsExpected; i++ {
315+
sentArray := started[i].Command.Lookup("endSessions").Array()
316+
values, _ := sentArray.Values()
317+
assert.Equal(t, len(values), endSessionsBatchSize,
318+
"batch size mismatch at index %d; expected %d sessions in batch, got %d", i, endSessionsBatchSize,
319+
len(values))
320+
}
321+
})
260322
}

0 commit comments

Comments
 (0)