Skip to content

Commit ecbcd69

Browse files
author
Divjot Arora
committed
Add command monitoring to core/connection
GODRIVER-19 Change-Id: I0742c8d4bcc7da76c5cd146ca8093e2a351554f8
1 parent 4503c54 commit ecbcd69

34 files changed

+3811
-5
lines changed

core/connection/connection.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ import (
2424
"sync/atomic"
2525
"time"
2626

27+
"github.com/mongodb/mongo-go-driver/bson"
2728
"github.com/mongodb/mongo-go-driver/core/address"
2829
"github.com/mongodb/mongo-go-driver/core/compressor"
2930
"github.com/mongodb/mongo-go-driver/core/description"
31+
"github.com/mongodb/mongo-go-driver/core/event"
3032
"github.com/mongodb/mongo-go-driver/core/wiremessage"
3133
)
3234

3335
var globalClientConnectionID uint64
36+
var emptyDoc = bson.NewDocument()
3437

3538
func nextClientConnectionID() uint64 {
3639
return atomic.AddUint64(&globalClientConnectionID, 1)
@@ -89,10 +92,12 @@ type connection struct {
8992
compressor compressor.Compressor // use for compressing messages
9093
// server can compress response with any compressor supported by driver
9194
compressorMap map[wiremessage.CompressorID]compressor.Compressor
95+
commandMap map[int64]*event.CommandMetadata // map for monitoring commands sent to server
9296
dead bool
9397
idleTimeout time.Duration
9498
idleDeadline time.Time
9599
lifetimeDeadline time.Time
100+
cmdMonitor *event.CommandMonitor
96101
readTimeout time.Duration
97102
uncompressBuf []byte // buffer to uncompress messages
98103
writeTimeout time.Duration
@@ -140,6 +145,7 @@ func New(ctx context.Context, addr address.Address, opts ...Option) (Connection,
140145
conn: nc,
141146
compressBuf: make([]byte, 256),
142147
compressorMap: compressorMap,
148+
commandMap: make(map[int64]*event.CommandMetadata),
143149
addr: addr,
144150
idleTimeout: cfg.idleTimeout,
145151
lifetimeDeadline: lifetimeDeadline,
@@ -180,6 +186,7 @@ func New(ctx context.Context, addr address.Address, opts ...Option) (Connection,
180186
desc = &d
181187
}
182188

189+
c.cmdMonitor = cfg.cmdMonitor // attach the command monitor later to avoid monitoring auth
183190
return c, desc, nil
184191
}
185192

@@ -342,6 +349,223 @@ func (c *connection) uncompressMessage(compressed wiremessage.Compressed) ([]byt
342349
return fullMessage, origHeader.OpCode, nil
343350
}
344351

352+
func canMonitor(cmd string) bool {
353+
if cmd == "authenticate" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "createUser" ||
354+
cmd == "updateUser" || cmd == "copydbgetnonce" || cmd == "copydbsaslstart" || cmd == "copydb" {
355+
return false
356+
}
357+
358+
return true
359+
}
360+
361+
func (c *connection) commandStartedEvent(wm wiremessage.WireMessage) error {
362+
if c.cmdMonitor == nil || c.cmdMonitor.Started == nil {
363+
return nil
364+
}
365+
366+
startedEvent := &event.CommandStartedEvent{
367+
ConnectionID: c.id,
368+
}
369+
370+
var cmd *bson.Document
371+
var err error
372+
373+
var acknowledged bool
374+
switch converted := wm.(type) {
375+
case wiremessage.Query:
376+
cmd, err = bson.ReadDocument([]byte(converted.Query))
377+
if err != nil {
378+
return err
379+
}
380+
381+
acknowledged = converted.AcknowledgedWrite()
382+
startedEvent.DatabaseName = converted.FullCollectionName[:len(converted.FullCollectionName)-5] // remove $.cmd
383+
startedEvent.RequestID = int64(converted.MsgHeader.RequestID)
384+
385+
cmdElem := cmd.ElementAt(0)
386+
if cmdElem.Key() == "$query" {
387+
cmd = cmdElem.Value().MutableDocument()
388+
}
389+
case wiremessage.Msg:
390+
cmd, err = converted.GetMainDocument()
391+
if err != nil {
392+
return err
393+
}
394+
395+
acknowledged = converted.AcknowledgedWrite()
396+
arr, identifier, err := converted.GetSequenceArray()
397+
if err != nil {
398+
return err
399+
}
400+
if arr != nil {
401+
cmd = cmd.Copy() // make copy to avoid changing original command
402+
cmd.Append(bson.EC.Array(identifier, arr))
403+
}
404+
405+
dbVal, err := cmd.LookupErr("$db")
406+
if err != nil {
407+
return err
408+
}
409+
410+
startedEvent.DatabaseName = dbVal.StringValue()
411+
startedEvent.RequestID = int64(converted.MsgHeader.RequestID)
412+
}
413+
414+
startedEvent.Command = cmd
415+
startedEvent.CommandName = cmd.ElementAt(0).Key()
416+
if !canMonitor(startedEvent.CommandName) {
417+
startedEvent.Command = emptyDoc
418+
}
419+
420+
c.cmdMonitor.Started(startedEvent)
421+
422+
if !acknowledged {
423+
if c.cmdMonitor.Succeeded == nil {
424+
return nil
425+
}
426+
427+
// unack writes must provide a CommandSucceededEvent with an { ok: 1 } reply
428+
finishedEvent := event.CommandFinishedEvent{
429+
DurationNanos: 0,
430+
CommandName: startedEvent.CommandName,
431+
RequestID: startedEvent.RequestID,
432+
ConnectionID: c.id,
433+
}
434+
435+
c.cmdMonitor.Succeeded(&event.CommandSucceededEvent{
436+
CommandFinishedEvent: finishedEvent,
437+
Reply: bson.NewDocument(
438+
bson.EC.Int32("ok", 1),
439+
),
440+
})
441+
442+
return nil
443+
}
444+
445+
c.commandMap[startedEvent.RequestID] = event.CreateMetadata(startedEvent.CommandName)
446+
return nil
447+
}
448+
449+
func processReply(reply *bson.Document) (bool, string) {
450+
iter := reply.Iterator()
451+
var success bool
452+
var errmsg string
453+
var errCode int32
454+
455+
for iter.Next() {
456+
elem := iter.Element()
457+
switch elem.Key() {
458+
case "ok":
459+
switch elem.Value().Type() {
460+
case bson.TypeInt32:
461+
if elem.Value().Int32() == 1 {
462+
success = true
463+
}
464+
case bson.TypeInt64:
465+
if elem.Value().Int64() == 1 {
466+
success = true
467+
}
468+
case bson.TypeDouble:
469+
if elem.Value().Double() == 1 {
470+
success = true
471+
}
472+
}
473+
case "errmsg":
474+
if str, ok := elem.Value().StringValueOK(); ok {
475+
errmsg = str
476+
}
477+
case "code":
478+
if c, ok := elem.Value().Int32OK(); ok {
479+
errCode = c
480+
}
481+
}
482+
}
483+
484+
if success {
485+
return true, ""
486+
}
487+
488+
fullErrMsg := fmt.Sprintf("Error code %d: %s", errCode, errmsg)
489+
return false, fullErrMsg
490+
}
491+
492+
func (c *connection) commandFinishedEvent(wm wiremessage.WireMessage) error {
493+
if c.cmdMonitor == nil {
494+
return nil
495+
}
496+
497+
var reply *bson.Document
498+
var requestID int64
499+
var err error
500+
501+
switch converted := wm.(type) {
502+
case wiremessage.Reply:
503+
requestID = int64(converted.MsgHeader.ResponseTo)
504+
reply, err = converted.GetMainDocument()
505+
case wiremessage.Msg:
506+
requestID = int64(converted.MsgHeader.ResponseTo)
507+
reply, err = converted.GetMainDocument()
508+
}
509+
510+
if err != nil {
511+
return err
512+
}
513+
514+
cmdMetadata := c.commandMap[requestID]
515+
delete(c.commandMap, requestID)
516+
success, errmsg := processReply(reply)
517+
518+
if (success && c.cmdMonitor.Succeeded == nil) || (!success && c.cmdMonitor.Failed == nil) {
519+
return nil
520+
}
521+
522+
finishedEvent := event.CommandFinishedEvent{
523+
DurationNanos: cmdMetadata.TimeDifference(),
524+
CommandName: cmdMetadata.Name,
525+
RequestID: requestID,
526+
ConnectionID: c.id,
527+
}
528+
529+
if success {
530+
if !canMonitor(finishedEvent.CommandName) {
531+
successEvent := &event.CommandSucceededEvent{
532+
Reply: emptyDoc,
533+
CommandFinishedEvent: finishedEvent,
534+
}
535+
c.cmdMonitor.Succeeded(successEvent)
536+
return nil
537+
}
538+
539+
// if response has type 1 document sequence, the sequence must be included as a BSON array in the event's reply.
540+
if opmsg, ok := wm.(wiremessage.Msg); ok {
541+
arr, identifier, err := opmsg.GetSequenceArray()
542+
if err != nil {
543+
return err
544+
}
545+
if arr != nil {
546+
reply = reply.Copy() // make copy to avoid changing original command
547+
reply.Append(bson.EC.Array(identifier, arr))
548+
}
549+
}
550+
551+
successEvent := &event.CommandSucceededEvent{
552+
Reply: reply,
553+
CommandFinishedEvent: finishedEvent,
554+
}
555+
556+
c.cmdMonitor.Succeeded(successEvent)
557+
return nil
558+
}
559+
560+
failureEvent := &event.CommandFailedEvent{
561+
Failure: errmsg,
562+
CommandFinishedEvent: finishedEvent,
563+
}
564+
565+
c.cmdMonitor.Failed(failureEvent)
566+
return nil
567+
}
568+
345569
func (c *connection) WriteWireMessage(ctx context.Context, wm wiremessage.WireMessage) error {
346570
var err error
347571
if c.dead {
@@ -415,6 +639,10 @@ func (c *connection) WriteWireMessage(ctx context.Context, wm wiremessage.WireMe
415639
}
416640

417641
c.bumpIdleDeadline()
642+
err = c.commandStartedEvent(wm)
643+
if err != nil {
644+
return err
645+
}
418646
return nil
419647
}
420648

@@ -562,6 +790,11 @@ func (c *connection) ReadWireMessage(ctx context.Context) (wiremessage.WireMessa
562790
}
563791

564792
c.bumpIdleDeadline()
793+
err = c.commandFinishedEvent(wm)
794+
if err != nil {
795+
return nil, err // TODO: do we care if monitoring fails?
796+
}
797+
565798
return wm, nil
566799
}
567800

core/connection/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/mongodb/mongo-go-driver/core/compressor"
14+
"github.com/mongodb/mongo-go-driver/core/event"
1415
)
1516

1617
type config struct {
@@ -20,6 +21,7 @@ type config struct {
2021
handshaker Handshaker
2122
idleTimeout time.Duration
2223
lifeTimeout time.Duration
24+
cmdMonitor *event.CommandMonitor
2325
readTimeout time.Duration
2426
writeTimeout time.Duration
2527
tlsConfig *TLSConfig
@@ -133,3 +135,11 @@ func WithTLSConfig(fn func(*TLSConfig) *TLSConfig) Option {
133135
return nil
134136
}
135137
}
138+
139+
// WithMonitor configures a event for command monitoring.
140+
func WithMonitor(fn func(*event.CommandMonitor) *event.CommandMonitor) Option {
141+
return func(c *config) error {
142+
c.cmdMonitor = fn(c.cmdMonitor)
143+
return nil
144+
}
145+
}

core/event/monitoring.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package event
2+
3+
import (
4+
"time"
5+
6+
"github.com/mongodb/mongo-go-driver/bson"
7+
)
8+
9+
// CommandMetadata contains metadata about a command sent to the server.
10+
type CommandMetadata struct {
11+
Name string
12+
Time time.Time
13+
}
14+
15+
// CreateMetadata creates metadata for a command.
16+
func CreateMetadata(name string) *CommandMetadata {
17+
return &CommandMetadata{
18+
Name: name,
19+
Time: time.Now(),
20+
}
21+
}
22+
23+
// TimeDifference returns the difference between now and the time a command was sent in nanoseconds.
24+
func (cm *CommandMetadata) TimeDifference() int64 {
25+
t := time.Now()
26+
duration := t.Sub(cm.Time)
27+
return duration.Nanoseconds()
28+
}
29+
30+
// CommandStartedEvent represents an event generated when a command is sent to a server.
31+
type CommandStartedEvent struct {
32+
Command *bson.Document
33+
DatabaseName string
34+
CommandName string
35+
RequestID int64
36+
ConnectionID string
37+
}
38+
39+
// CommandFinishedEvent represents a generic command finishing.
40+
type CommandFinishedEvent struct {
41+
DurationNanos int64
42+
CommandName string
43+
RequestID int64
44+
ConnectionID string
45+
}
46+
47+
// CommandSucceededEvent represents an event generated when a command's execution succeeds.
48+
type CommandSucceededEvent struct {
49+
CommandFinishedEvent
50+
Reply *bson.Document
51+
}
52+
53+
// CommandFailedEvent represents an event generated when a command's execution fails.
54+
type CommandFailedEvent struct {
55+
CommandFinishedEvent
56+
Failure string
57+
}
58+
59+
// CommandMonitor represents a monitor that is triggered for different events.
60+
type CommandMonitor struct {
61+
Started func(*CommandStartedEvent)
62+
Succeeded func(*CommandSucceededEvent)
63+
Failed func(*CommandFailedEvent)
64+
}

core/topology/cursor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func newCursor(result bson.Reader, server *Server, opts ...option.CursorOptioner
4444
c := &cursor{
4545
current: -1,
4646
server: server,
47+
opts: opts,
4748
}
4849
var ok bool
4950
for itr.Next() {

0 commit comments

Comments
 (0)