Skip to content

Commit 74153f4

Browse files
committed
added readpref and tests.
1 parent 7626f1f commit 74153f4

16 files changed

+3042
-657
lines changed

core/cluster_monitor.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ func StartClusterMonitor(opts ClusterOptions) (*ClusterMonitor, error) {
3232

3333
if opts.ReplicaSetName != "" {
3434
m.fsm.setName = opts.ReplicaSetName
35-
m.fsm.ClusterType = desc.ReplicaSetNoPrimary
35+
m.fsm.Type = desc.ReplicaSetNoPrimary
3636
}
3737
if opts.ConnectionMode == SingleMode {
38-
m.fsm.ClusterType = desc.Single
38+
m.fsm.Type = desc.Single
3939
}
4040

4141
for _, ep := range opts.Servers {
@@ -210,15 +210,15 @@ func (fsm *clusterMonitorFSM) apply(d *desc.Server) {
210210
copy(newServers, fsm.Servers)
211211

212212
fsm.Cluster = desc.Cluster{
213-
ClusterType: fsm.ClusterType,
214-
Servers: newServers,
213+
Type: fsm.Type,
214+
Servers: newServers,
215215
}
216216

217217
if _, ok := fsm.findServer(d.Endpoint); !ok {
218218
return
219219
}
220220

221-
switch fsm.ClusterType {
221+
switch fsm.Type {
222222
case desc.UnknownClusterType:
223223
fsm.applyToUnknownClusterType(d)
224224
case desc.Sharded:
@@ -233,7 +233,7 @@ func (fsm *clusterMonitorFSM) apply(d *desc.Server) {
233233
}
234234

235235
func (fsm *clusterMonitorFSM) applyToReplicaSetNoPrimary(d *desc.Server) {
236-
switch d.ServerType {
236+
switch d.Type {
237237
case desc.Standalone, desc.Mongos:
238238
fsm.removeServerByEndpoint(d.Endpoint)
239239
case desc.RSPrimary:
@@ -246,7 +246,7 @@ func (fsm *clusterMonitorFSM) applyToReplicaSetNoPrimary(d *desc.Server) {
246246
}
247247

248248
func (fsm *clusterMonitorFSM) applyToReplicaSetWithPrimary(d *desc.Server) {
249-
switch d.ServerType {
249+
switch d.Type {
250250
case desc.Standalone, desc.Mongos:
251251
fsm.removeServerByEndpoint(d.Endpoint)
252252
fsm.checkIfHasPrimary()
@@ -261,7 +261,7 @@ func (fsm *clusterMonitorFSM) applyToReplicaSetWithPrimary(d *desc.Server) {
261261
}
262262

263263
func (fsm *clusterMonitorFSM) applyToShardedClusterType(d *desc.Server) {
264-
switch d.ServerType {
264+
switch d.Type {
265265
case desc.Mongos, desc.UnknownServerType:
266266
fsm.replaceServer(d)
267267
case desc.Standalone, desc.RSPrimary, desc.RSSecondary, desc.RSArbiter, desc.RSMember, desc.RSGhost:
@@ -270,7 +270,7 @@ func (fsm *clusterMonitorFSM) applyToShardedClusterType(d *desc.Server) {
270270
}
271271

272272
func (fsm *clusterMonitorFSM) applyToSingle(d *desc.Server) {
273-
switch d.ServerType {
273+
switch d.Type {
274274
case desc.UnknownServerType:
275275
fsm.replaceServer(d)
276276
case desc.Standalone, desc.Mongos:
@@ -291,7 +291,7 @@ func (fsm *clusterMonitorFSM) applyToSingle(d *desc.Server) {
291291
}
292292

293293
func (fsm *clusterMonitorFSM) applyToUnknownClusterType(d *desc.Server) {
294-
switch d.ServerType {
294+
switch d.Type {
295295
case desc.Mongos:
296296
fsm.setType(desc.Sharded)
297297
fsm.replaceServer(d)
@@ -433,7 +433,7 @@ func (fsm *clusterMonitorFSM) addServer(endpoint desc.Endpoint) {
433433

434434
func (fsm *clusterMonitorFSM) findPrimary() (int, bool) {
435435
for i, s := range fsm.Servers {
436-
if s.ServerType == desc.RSPrimary {
436+
if s.Type == desc.RSPrimary {
437437
return i, true
438438
}
439439
}
@@ -474,5 +474,5 @@ func (fsm *clusterMonitorFSM) setServer(i int, d *desc.Server) {
474474
}
475475

476476
func (fsm *clusterMonitorFSM) setType(clusterType desc.ClusterType) {
477-
fsm.ClusterType = clusterType
477+
fsm.Type = clusterType
478478
}

core/desc/cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package desc
22

33
// Cluster is a description of a cluster.
44
type Cluster struct {
5-
ClusterType ClusterType
6-
Servers []*Server
5+
Servers []*Server
6+
Type ClusterType
77
}
88

99
// Server returns the ServerDesc with the specified endpoint.

core/desc/server.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,22 @@ const UnsetRTT = -1 * time.Millisecond
1313
type Server struct {
1414
Endpoint Endpoint
1515

16-
CanonicalEndpoint Endpoint
17-
ElectionID bson.ObjectId
18-
LastError error
19-
LastWriteTimestamp time.Time
20-
MaxBatchCount uint16
21-
MaxDocumentSize uint32
22-
MaxMessageSize uint32
23-
Members []Endpoint
24-
ServerType ServerType
25-
SetName string
26-
SetVersion uint32
27-
Tags []TagSet
28-
WireVersion Range
29-
Version Version
16+
CanonicalEndpoint Endpoint
17+
ElectionID bson.ObjectId
18+
HeartbeatInterval time.Duration
19+
LastError error
20+
LastUpdateTime time.Time
21+
LastWriteTime time.Time
22+
MaxBatchCount uint16
23+
MaxDocumentSize uint32
24+
MaxMessageSize uint32
25+
Members []Endpoint
26+
SetName string
27+
SetVersion uint32
28+
Tags TagSet
29+
Type ServerType
30+
WireVersion Range
31+
Version Version
3032

3133
averageRTT time.Duration
3234
averageRTTSet bool

core/desc/tag.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,42 @@ type Tag struct {
66
Value string
77
}
88

9-
// TagSet is an ordered list of Tags.
9+
// NewTagSet creates a new tag set by taking the entries in pairs.
10+
func NewTagSet(tags ...string) TagSet {
11+
if len(tags)%2 != 0 {
12+
panic("desc.NewTagSet: argument count is odd")
13+
}
14+
15+
var set TagSet
16+
for i := 0; i < len(tags); i += 2 {
17+
set = append(set, Tag{Name: tags[i], Value: tags[i+1]})
18+
}
19+
return set
20+
}
21+
22+
// TagSet is an ordered list of name/value pairs.
1023
type TagSet []Tag
24+
25+
// Contains indicates whether the name/value pair
26+
// exists in the tag set.
27+
func (ts TagSet) Contains(name, value string) bool {
28+
for _, t := range ts {
29+
if t.Name == name && t.Value == value {
30+
return true
31+
}
32+
}
33+
34+
return false
35+
}
36+
37+
// ContainsAll indicates whether all the name/value pairs
38+
// exist in the tag set.
39+
func (ts TagSet) ContainsAll(other []Tag) bool {
40+
for _, ot := range other {
41+
if !ts.Contains(ot.Name, ot.Value) {
42+
return false
43+
}
44+
}
45+
46+
return true
47+
}

core/feature/supported.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package feature
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/10gen/mongo-go-driver/core/desc"
7+
)
8+
9+
// MaxStaleness returns an error if the given server
10+
// does not support max staleness.
11+
func MaxStaleness(version desc.Version) error {
12+
if !version.AtLeast(3, 4, 0) {
13+
return fmt.Errorf("max staleness is only supported for servers 3.4 or newer")
14+
}
15+
16+
return nil
17+
}

core/feature/supported_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package feature_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/10gen/mongo-go-driver/core/desc"
7+
"github.com/10gen/mongo-go-driver/core/feature"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestMaxStaleness(t *testing.T) {
12+
13+
tests := []struct {
14+
version desc.Version
15+
expected bool
16+
}{
17+
{desc.NewVersion(2, 4, 0), false},
18+
{desc.NewVersion(3, 3, 99), false},
19+
{desc.NewVersion(3, 4, 0), true},
20+
{desc.NewVersion(3, 4, 1), true},
21+
}
22+
for _, test := range tests {
23+
t.Run(test.version.String(), func(t *testing.T) {
24+
err := feature.MaxStaleness(test.version)
25+
if test.expected {
26+
require.NoError(t, err)
27+
} else {
28+
require.Error(t, err)
29+
}
30+
})
31+
}
32+
}

core/readpref/mode.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package readpref
2+
3+
// Mode indicates the user's preference on reads.
4+
type Mode uint8
5+
6+
// Mode constants
7+
const (
8+
// PrimaryMode indicates that only a primary is
9+
// considered for reading. This is the default
10+
// mode.
11+
PrimaryMode Mode = iota
12+
// PrimaryPreferredMode indicates that if a primary
13+
// is available, use it; otherwise, eligible
14+
// secondaries will be considered.
15+
PrimaryPreferredMode
16+
// SecondaryMode indicates that only secondaries
17+
// should be considered.
18+
SecondaryMode
19+
// SecondaryPreferredMode indicates that only secondaries
20+
// should be considered when one is available. If none
21+
// are available, then a primary will be considered.
22+
SecondaryPreferredMode
23+
// NearestMode indicates that all primaries and secondaries
24+
// will be considered.
25+
NearestMode
26+
)

core/readpref/readpref.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package readpref
2+
3+
import (
4+
"time"
5+
6+
"github.com/10gen/mongo-go-driver/core/desc"
7+
)
8+
9+
// New constructs a read preference from the mode and optionally
10+
// some name value pairs.
11+
func New(mode Mode, tagSets ...desc.TagSet) *ReadPref {
12+
// TODO: think about having an error for invalid construction
13+
14+
return &ReadPref{
15+
maxStaleness: time.Duration(-1) * time.Millisecond,
16+
mode: mode,
17+
tagSets: tagSets,
18+
}
19+
}
20+
21+
// NewWithMaxStaleness constructs a read preference from a mode, a
22+
// maximum staleness, and optionally some name value pairs.
23+
func NewWithMaxStaleness(mode Mode, maxStaleness time.Duration, tagSets ...desc.TagSet) *ReadPref {
24+
// TODO: think about having an error for invalid construction
25+
26+
return &ReadPref{
27+
maxStaleness: maxStaleness,
28+
maxStalenessSet: true,
29+
mode: mode,
30+
tagSets: tagSets,
31+
}
32+
}
33+
34+
var defaultReadPref = &ReadPref{}
35+
36+
// Default returns the default read preference.
37+
func Default() *ReadPref {
38+
return defaultReadPref
39+
}
40+
41+
// ReadPref determines which servers are considered suitable for read operations.
42+
type ReadPref struct {
43+
maxStaleness time.Duration
44+
maxStalenessSet bool
45+
mode Mode
46+
tagSets []desc.TagSet
47+
}
48+
49+
// MaxStaleness is the maximum amount of time to allow
50+
// a server to be considered eligible for selection. The
51+
// second return value indicates if this value has been set.
52+
func (r *ReadPref) MaxStaleness() (time.Duration, bool) {
53+
return r.maxStaleness, r.maxStalenessSet
54+
}
55+
56+
// Mode indicates the mode of the read preference.
57+
func (r *ReadPref) Mode() Mode {
58+
return r.mode
59+
}
60+
61+
// TagSets are multiple tag sets indicating
62+
// which servers should be considered.
63+
func (r *ReadPref) TagSets() []desc.TagSet {
64+
return r.tagSets
65+
}

0 commit comments

Comments
 (0)