Skip to content

Commit 9b26cbb

Browse files
committed
Merge branch 'pr/75'
Change-Id: Iac8815ecb955ac4915b8c29af60aba40d6dac09d
2 parents 44fa48d + ad3c80f commit 9b26cbb

File tree

12 files changed

+592
-36
lines changed

12 files changed

+592
-36
lines changed

core/command/count_documents.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package command
8+
9+
import (
10+
"context"
11+
"errors"
12+
"github.com/mongodb/mongo-go-driver/bson"
13+
"github.com/mongodb/mongo-go-driver/core/description"
14+
"github.com/mongodb/mongo-go-driver/core/option"
15+
"github.com/mongodb/mongo-go-driver/core/readconcern"
16+
"github.com/mongodb/mongo-go-driver/core/readpref"
17+
"github.com/mongodb/mongo-go-driver/core/session"
18+
"github.com/mongodb/mongo-go-driver/core/wiremessage"
19+
)
20+
21+
// CountDocuments represents the CountDocuments command.
22+
//
23+
// The countDocuments command counts how many documents in a collection match the given query.
24+
type CountDocuments struct {
25+
NS Namespace
26+
Pipeline *bson.Array
27+
Opts []option.CountOptioner
28+
ReadPref *readpref.ReadPref
29+
ReadConcern *readconcern.ReadConcern
30+
Clock *session.ClusterClock
31+
Session *session.Client
32+
33+
result int64
34+
err error
35+
}
36+
37+
// Encode will encode this command into a wire message for the given server description.
38+
func (c *CountDocuments) Encode(desc description.SelectedServer) (wiremessage.WireMessage, error) {
39+
if err := c.NS.Validate(); err != nil {
40+
return nil, err
41+
}
42+
command := bson.NewDocument()
43+
command.Append(bson.EC.String("aggregate", c.NS.Collection), bson.EC.Array("pipeline", c.Pipeline))
44+
45+
cursor := bson.NewDocument()
46+
command.Append(bson.EC.SubDocument("cursor", cursor))
47+
for _, opt := range c.Opts {
48+
if opt == nil {
49+
continue
50+
}
51+
//because we already have these options in the pipeline
52+
switch opt.(type) {
53+
case option.OptSkip:
54+
continue
55+
case option.OptLimit:
56+
continue
57+
}
58+
err := opt.Option(command)
59+
if err != nil {
60+
return nil, err
61+
}
62+
}
63+
64+
return (&Read{DB: c.NS.DB, ReadPref: c.ReadPref, Command: command}).Encode(desc)
65+
}
66+
67+
// Decode will decode the wire message using the provided server description. Errors during decoding
68+
// are deferred until either the Result or Err methods are called.
69+
func (c *CountDocuments) Decode(ctx context.Context, desc description.SelectedServer, cb CursorBuilder, wm wiremessage.WireMessage) *CountDocuments {
70+
rdr, err := (&Read{}).Decode(desc, wm).Result()
71+
if err != nil {
72+
c.err = err
73+
return c
74+
}
75+
cur, err := cb.BuildCursor(rdr, c.Session, c.Clock)
76+
if err != nil {
77+
c.err = err
78+
return c
79+
}
80+
81+
var doc = bson.NewDocument()
82+
if cur.Next(ctx) {
83+
err = cur.Decode(doc)
84+
if err != nil {
85+
c.err = err
86+
return c
87+
}
88+
val, err := doc.LookupErr("n")
89+
switch {
90+
case err == bson.ErrElementNotFound:
91+
c.err = errors.New("Invalid response from server, no 'n' field")
92+
return c
93+
case err != nil:
94+
c.err = err
95+
return c
96+
}
97+
switch val.Type() {
98+
case bson.TypeInt32:
99+
c.result = int64(val.Int32())
100+
case bson.TypeInt64:
101+
c.result = val.Int64()
102+
default:
103+
c.err = errors.New("Invalid response from server, value field is not a number")
104+
}
105+
106+
return c
107+
}
108+
109+
c.result = 0
110+
return c
111+
}
112+
113+
// Result returns the result of a decoded wire message and server description.
114+
func (c *CountDocuments) Result() (int64, error) {
115+
if c.err != nil {
116+
return 0, c.err
117+
}
118+
return c.result, nil
119+
}
120+
121+
// Err returns the error set on this command.
122+
func (c *CountDocuments) Err() error { return c.err }
123+
124+
// RoundTrip handles the execution of this command using the provided wiremessage.ReadWriter.
125+
func (c *CountDocuments) RoundTrip(ctx context.Context, desc description.SelectedServer, cb CursorBuilder, rw wiremessage.ReadWriter) (int64, error) {
126+
wm, err := c.Encode(desc)
127+
if err != nil {
128+
return 0, err
129+
}
130+
131+
err = rw.WriteWireMessage(ctx, wm)
132+
if err != nil {
133+
return 0, err
134+
}
135+
wm, err = rw.ReadWireMessage(ctx)
136+
if err != nil {
137+
return 0, err
138+
}
139+
return c.Decode(ctx, desc, cb, wm).Result()
140+
}

core/dispatch/count_documents.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package dispatch
8+
9+
import (
10+
"context"
11+
12+
"github.com/mongodb/mongo-go-driver/core/command"
13+
"github.com/mongodb/mongo-go-driver/core/description"
14+
"github.com/mongodb/mongo-go-driver/core/topology"
15+
)
16+
17+
// CountDocuments handles the full cycle dispatch and execution of a countDocuments command against the provided
18+
// topology.
19+
func CountDocuments(
20+
ctx context.Context,
21+
cmd command.CountDocuments,
22+
topo *topology.Topology,
23+
selector description.ServerSelector,
24+
) (int64, error) {
25+
26+
ss, err := topo.SelectServer(ctx, selector)
27+
if err != nil {
28+
return 0, err
29+
}
30+
31+
desc := ss.Description()
32+
conn, err := ss.Connection(ctx)
33+
if err != nil {
34+
return 0, err
35+
}
36+
defer conn.Close()
37+
38+
return cmd.RoundTrip(ctx, desc, ss, conn)
39+
}

core/option/options.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -620,17 +620,17 @@ func (opt OptMaxTime) Option(d *bson.Document) error {
620620
return nil
621621
}
622622

623-
func (OptMaxTime) aggregateOption() {}
624-
func (OptMaxTime) countOption() {}
625-
func (OptMaxTime) distinctOption() {}
626-
func (OptMaxTime) findOption() {}
627-
func (OptMaxTime) findOneOption() {}
628-
func (OptMaxTime) findOneAndDeleteOption() {}
629-
func (OptMaxTime) findOneAndReplaceOption() {}
630-
func (OptMaxTime) findOneAndUpdateOption() {}
631-
func (OptMaxTime) listIndexesOption() {}
632-
func (OptMaxTime) dropIndexesOption() {}
633-
func (OptMaxTime) createIndexesOption() {}
623+
func (OptMaxTime) aggregateOption() {}
624+
func (OptMaxTime) countOption() {}
625+
func (OptMaxTime) distinctOption() {}
626+
func (OptMaxTime) findOption() {}
627+
func (OptMaxTime) findOneOption() {}
628+
func (OptMaxTime) findOneAndDeleteOption() {}
629+
func (OptMaxTime) findOneAndReplaceOption() {}
630+
func (OptMaxTime) findOneAndUpdateOption() {}
631+
func (OptMaxTime) listIndexesOption() {}
632+
func (OptMaxTime) dropIndexesOption() {}
633+
func (OptMaxTime) createIndexesOption() {}
634634

635635
// String implements the Stringer interface.
636636
func (opt OptMaxTime) String() string {

examples/documentation_examples/examples.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ import (
1515

1616
"github.com/mongodb/mongo-go-driver/bson"
1717
"github.com/mongodb/mongo-go-driver/mongo"
18-
"github.com/stretchr/testify/require"
1918
"github.com/mongodb/mongo-go-driver/mongo/findopt"
19+
"github.com/stretchr/testify/require"
2020
)
2121

2222
func requireCursorLength(t *testing.T, cursor mongo.Cursor, length int) {

internal/channel_connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package internal
22

33
import (
44
"context"
5-
"testing"
6-
"github.com/mongodb/mongo-go-driver/core/wiremessage"
75
"github.com/mongodb/mongo-go-driver/bson"
6+
"github.com/mongodb/mongo-go-driver/core/wiremessage"
7+
"testing"
88
)
99

1010
// Implements the connection.Connection interface by reading and writing wire messages

internal/testutil/config.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ import (
1515
"sync"
1616
"testing"
1717

18-
"github.com/mongodb/mongo-go-driver/core/connstring"
19-
"github.com/mongodb/mongo-go-driver/core/topology"
20-
"github.com/mongodb/mongo-go-driver/core/event"
18+
"github.com/mongodb/mongo-go-driver/bson"
19+
"github.com/mongodb/mongo-go-driver/core/command"
2120
"github.com/mongodb/mongo-go-driver/core/connection"
21+
"github.com/mongodb/mongo-go-driver/core/connstring"
2222
"github.com/mongodb/mongo-go-driver/core/description"
23+
"github.com/mongodb/mongo-go-driver/core/event"
24+
"github.com/mongodb/mongo-go-driver/core/topology"
2325
"github.com/stretchr/testify/require"
24-
"github.com/mongodb/mongo-go-driver/core/command"
25-
"github.com/mongodb/mongo-go-driver/bson"
2626
)
2727

2828
var connectionString connstring.ConnString
@@ -80,20 +80,20 @@ func AddCompressorToUri(uri string) string {
8080
func MonitoredTopology(t *testing.T, monitor *event.CommandMonitor) *topology.Topology {
8181
cs := ConnString(t)
8282
opts := []topology.Option{
83-
topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return cs }),
84-
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
85-
return append(
86-
opts,
87-
topology.WithConnectionOptions(func(opts ...connection.Option) []connection.Option {
88-
return append(
89-
opts,
90-
connection.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
91-
return monitor
92-
}),
93-
)
94-
}),
95-
)
96-
}),
83+
topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return cs }),
84+
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
85+
return append(
86+
opts,
87+
topology.WithConnectionOptions(func(opts ...connection.Option) []connection.Option {
88+
return append(
89+
opts,
90+
connection.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
91+
return monitor
92+
}),
93+
)
94+
}),
95+
)
96+
}),
9797
}
9898

9999
monitoredTopologyOnce.Do(func() {
@@ -110,7 +110,7 @@ func MonitoredTopology(t *testing.T, monitor *event.CommandMonitor) *topology.To
110110
require.NoError(t, err)
111111

112112
_, err = (&command.Write{
113-
DB: DBName(t),
113+
DB: DBName(t),
114114
Command: bson.NewDocument(bson.EC.Int32("dropDatabase", 1)),
115115
}).RoundTrip(context.Background(), s.SelectedDescription(), c)
116116

internal/testutil/ops.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ import (
1515
"github.com/mongodb/mongo-go-driver/core/command"
1616
"github.com/mongodb/mongo-go-driver/core/description"
1717
"github.com/mongodb/mongo-go-driver/core/dispatch"
18+
"github.com/mongodb/mongo-go-driver/core/session"
1819
"github.com/mongodb/mongo-go-driver/core/topology"
20+
"github.com/mongodb/mongo-go-driver/core/uuid"
1921
"github.com/mongodb/mongo-go-driver/core/writeconcern"
2022
"github.com/mongodb/mongo-go-driver/internal/testutil/helpers"
2123
"github.com/stretchr/testify/require"
22-
"github.com/mongodb/mongo-go-driver/core/uuid"
23-
"github.com/mongodb/mongo-go-driver/core/session"
2424
)
2525

2626
// AutoCreateIndexes creates an index in the test cluster.

mongo/collection.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,79 @@ func (coll *Collection) Count(ctx context.Context, filter interface{},
702702
)
703703
}
704704

705+
// CountDocuments gets the number of documents matching the filter. A user can supply a
706+
// custom context to this method, or nil to default to context.Background().
707+
//
708+
// This method uses countDocumentsAggregatePipeline to turn the filter parameter and options
709+
// into aggregate pipeline.
710+
func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
711+
opts ...countopt.Count) (int64, error) {
712+
713+
if ctx == nil {
714+
ctx = context.Background()
715+
}
716+
717+
pipelineArr, err := countDocumentsAggregatePipeline(filter, opts...)
718+
if err != nil {
719+
return 0, err
720+
}
721+
722+
countOpts, sess, err := countopt.BundleCount(opts...).Unbundle(true)
723+
if err != nil {
724+
return 0, err
725+
}
726+
727+
err = coll.client.ValidSession(sess)
728+
if err != nil {
729+
return 0, err
730+
}
731+
732+
oldns := coll.namespace()
733+
cmd := command.CountDocuments{
734+
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
735+
Pipeline: pipelineArr,
736+
Opts: countOpts,
737+
ReadPref: coll.readPreference,
738+
ReadConcern: coll.readConcern,
739+
Session: sess,
740+
Clock: coll.client.clock,
741+
}
742+
return dispatch.CountDocuments(ctx, cmd, coll.client.topology, coll.readSelector)
743+
}
744+
745+
// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata.
746+
func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
747+
opts ...countopt.EstimatedDocumentCount) (int64, error) {
748+
749+
if ctx == nil {
750+
ctx = context.Background()
751+
}
752+
753+
countOpts, sess, err := countopt.BundleEstimatedDocumentCount(opts...).Unbundle(true)
754+
if err != nil {
755+
return 0, err
756+
}
757+
758+
err = coll.client.ValidSession(sess)
759+
if err != nil {
760+
return 0, err
761+
}
762+
763+
oldns := coll.namespace()
764+
765+
cmd := command.Count{
766+
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
767+
Query: bson.NewDocument(),
768+
Opts: countOpts,
769+
ReadPref: coll.readPreference,
770+
ReadConcern: coll.readConcern,
771+
Session: sess,
772+
Clock: coll.client.clock,
773+
}
774+
return dispatch.Count(ctx, cmd, coll.client.topology, coll.readSelector, coll.client.id,
775+
coll.client.topology.SessionPool)
776+
}
777+
705778
// Distinct finds the distinct values for a specified field across a single
706779
// collection. A user can supply a custom context to this method, or nil to
707780
// default to context.Background().

0 commit comments

Comments
 (0)