Skip to content

Commit 3923ceb

Browse files
committed
added max staleness tests.
1 parent 8111818 commit 3923ceb

File tree

3 files changed

+240
-11
lines changed

3 files changed

+240
-11
lines changed

core/readpref/selector.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package readpref
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/10gen/mongo-go-driver/core/desc"
78
"github.com/10gen/mongo-go-driver/core/feature"
@@ -37,6 +38,10 @@ func SelectServer(rp *ReadPref, cluster *desc.Cluster, servers []*desc.Server) (
3738
}
3839

3940
func selectForReplicaSet(rp *ReadPref, cluster *desc.Cluster, servers []*desc.Server) ([]*desc.Server, error) {
41+
if err := verifyMaxStaleness(rp, cluster); err != nil {
42+
return nil, err
43+
}
44+
4045
switch rp.Mode() {
4146
case PrimaryMode:
4247
return selectByType(servers, desc.RSPrimary), nil
@@ -73,7 +78,8 @@ func selectSecondaries(rp *ReadPref, servers []*desc.Server) []*desc.Server {
7378
if len(secondaries) == 0 {
7479
return secondaries
7580
}
76-
if maxStaleness, ok := rp.MaxStaleness(); ok {
81+
if maxStaleness, set := rp.MaxStaleness(); set {
82+
7783
primaries := selectByType(servers, desc.RSPrimary)
7884
if len(primaries) == 0 {
7985
baseTime := secondaries[0].LastWriteTime
@@ -139,3 +145,33 @@ func selectByType(servers []*desc.Server, t desc.ServerType) []*desc.Server {
139145

140146
return result
141147
}
148+
149+
func verifyMaxStaleness(rp *ReadPref, cluster *desc.Cluster) error {
150+
maxStaleness, set := rp.MaxStaleness()
151+
if !set {
152+
return nil
153+
}
154+
155+
fmt.Printf("HERE")
156+
157+
if maxStaleness < time.Duration(90)*time.Second {
158+
return fmt.Errorf(
159+
"max staleness (%s) must be greater than or equal to 90s",
160+
maxStaleness,
161+
)
162+
}
163+
164+
// we'll assume all servers have the same heartbeat interval...
165+
server := cluster.Servers[0]
166+
idleWritePeriod := time.Duration(10) * time.Second
167+
168+
if maxStaleness < server.HeartbeatInterval+idleWritePeriod {
169+
return fmt.Errorf(
170+
"max staleness (%s) must be greater than or equal to the heartbeat interval (%s) plus idle write period (%s)",
171+
maxStaleness,
172+
server.HeartbeatInterval,
173+
idleWritePeriod)
174+
}
175+
176+
return nil
177+
}

core/readpref/selector_test.go

Lines changed: 202 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,93 @@ package readpref_test
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/10gen/mongo-go-driver/core/desc"
78
. "github.com/10gen/mongo-go-driver/core/readpref"
89
"github.com/stretchr/testify/require"
910
)
1011

1112
var readPrefTestPrimary = &desc.Server{
12-
Endpoint: desc.Endpoint("localhost:27017"),
13-
Type: desc.RSPrimary,
14-
Tags: desc.NewTagSet("a", "1"),
13+
Endpoint: desc.Endpoint("localhost:27017"),
14+
HeartbeatInterval: time.Duration(10) * time.Second,
15+
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
16+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
17+
Type: desc.RSPrimary,
18+
Tags: desc.NewTagSet("a", "1"),
19+
Version: desc.NewVersion(3, 4, 0),
1520
}
1621
var readPrefTestSecondary1 = &desc.Server{
17-
Endpoint: desc.Endpoint("localhost:27018"),
18-
Type: desc.RSSecondary,
19-
Tags: desc.NewTagSet("a", "1"),
22+
Endpoint: desc.Endpoint("localhost:27018"),
23+
HeartbeatInterval: time.Duration(10) * time.Second,
24+
LastWriteTime: time.Date(2017, 2, 11, 13, 58, 0, 0, time.UTC),
25+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
26+
Type: desc.RSSecondary,
27+
Tags: desc.NewTagSet("a", "1"),
28+
Version: desc.NewVersion(3, 4, 0),
2029
}
2130
var readPrefTestSecondary2 = &desc.Server{
22-
Endpoint: desc.Endpoint("localhost:27018"),
23-
Type: desc.RSSecondary,
24-
Tags: desc.NewTagSet("a", "2"),
31+
Endpoint: desc.Endpoint("localhost:27018"),
32+
HeartbeatInterval: time.Duration(10) * time.Second,
33+
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
34+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
35+
Type: desc.RSSecondary,
36+
Tags: desc.NewTagSet("a", "2"),
37+
Version: desc.NewVersion(3, 4, 0),
2538
}
2639
var readPrefTestCluster = &desc.Cluster{
2740
Type: desc.ReplicaSetWithPrimary,
2841
Servers: []*desc.Server{readPrefTestPrimary, readPrefTestSecondary1, readPrefTestSecondary2},
2942
}
3043

44+
func TestSelectServer_Sharded(t *testing.T) {
45+
require := require.New(t)
46+
subject := New(PrimaryMode)
47+
48+
server := &desc.Server{
49+
Endpoint: desc.Endpoint("localhost:27017"),
50+
HeartbeatInterval: time.Duration(10) * time.Second,
51+
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
52+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
53+
Type: desc.Mongos,
54+
Version: desc.NewVersion(3, 4, 0),
55+
}
56+
cluster := &desc.Cluster{
57+
Type: desc.Sharded,
58+
Servers: []*desc.Server{server},
59+
}
60+
61+
result, err := SelectServer(subject, cluster, cluster.Servers)
62+
63+
require.NoError(err)
64+
require.Len(result, 1)
65+
require.Equal([]*desc.Server{server}, result)
66+
}
67+
68+
func TestSelectServer_Single(t *testing.T) {
69+
require := require.New(t)
70+
subject := New(PrimaryMode)
71+
72+
server := &desc.Server{
73+
Endpoint: desc.Endpoint("localhost:27017"),
74+
HeartbeatInterval: time.Duration(10) * time.Second,
75+
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
76+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
77+
Type: desc.Mongos,
78+
Version: desc.NewVersion(3, 4, 0),
79+
}
80+
cluster := &desc.Cluster{
81+
Type: desc.Single,
82+
Servers: []*desc.Server{server},
83+
}
84+
85+
result, err := SelectServer(subject, cluster, cluster.Servers)
86+
87+
require.NoError(err)
88+
require.Len(result, 1)
89+
require.Equal([]*desc.Server{server}, result)
90+
}
91+
3192
func TestSelectServer_Primary(t *testing.T) {
3293
require := require.New(t)
3394
subject := New(PrimaryMode)
@@ -93,6 +154,28 @@ func TestSelectServer_PrimaryPreferred_with_no_primary_and_tags(t *testing.T) {
93154
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
94155
}
95156

157+
func TestSelectServer_PrimaryPreferred_with_maxStaleness(t *testing.T) {
158+
require := require.New(t)
159+
subject := NewWithMaxStaleness(PrimaryPreferredMode, time.Duration(90)*time.Second)
160+
161+
result, err := SelectServer(subject, readPrefTestCluster, readPrefTestCluster.Servers)
162+
163+
require.NoError(err)
164+
require.Len(result, 1)
165+
require.Equal([]*desc.Server{readPrefTestPrimary}, result)
166+
}
167+
168+
func TestSelectServer_PrimaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
169+
require := require.New(t)
170+
subject := NewWithMaxStaleness(PrimaryPreferredMode, time.Duration(90)*time.Second)
171+
172+
result, err := SelectServer(subject, readPrefTestCluster, []*desc.Server{readPrefTestSecondary1, readPrefTestSecondary2})
173+
174+
require.NoError(err)
175+
require.Len(result, 1)
176+
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
177+
}
178+
96179
func TestSelectServer_SecondaryPreferred(t *testing.T) {
97180
require := require.New(t)
98181
subject := New(SecondaryPreferredMode)
@@ -157,6 +240,28 @@ func TestSelectServer_SecondaryPreferred_with_no_secondaries_or_primary(t *testi
157240
require.Len(result, 0)
158241
}
159242

243+
func TestSelectServer_SecondaryPreferred_with_maxStaleness(t *testing.T) {
244+
require := require.New(t)
245+
subject := NewWithMaxStaleness(SecondaryPreferredMode, time.Duration(90)*time.Second)
246+
247+
result, err := SelectServer(subject, readPrefTestCluster, readPrefTestCluster.Servers)
248+
249+
require.NoError(err)
250+
require.Len(result, 1)
251+
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
252+
}
253+
254+
func TestSelectServer_SecondaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
255+
require := require.New(t)
256+
subject := NewWithMaxStaleness(SecondaryPreferredMode, time.Duration(90)*time.Second)
257+
258+
result, err := SelectServer(subject, readPrefTestCluster, []*desc.Server{readPrefTestSecondary1, readPrefTestSecondary2})
259+
260+
require.NoError(err)
261+
require.Len(result, 1)
262+
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
263+
}
264+
160265
func TestSelectServer_Secondary(t *testing.T) {
161266
require := require.New(t)
162267
subject := New(SecondaryMode)
@@ -199,6 +304,28 @@ func TestSelectServer_Secondary_with_no_secondaries(t *testing.T) {
199304
require.Len(result, 0)
200305
}
201306

307+
func TestSelectServer_Secondary_with_maxStaleness(t *testing.T) {
308+
require := require.New(t)
309+
subject := NewWithMaxStaleness(SecondaryMode, time.Duration(90)*time.Second)
310+
311+
result, err := SelectServer(subject, readPrefTestCluster, readPrefTestCluster.Servers)
312+
313+
require.NoError(err)
314+
require.Len(result, 1)
315+
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
316+
}
317+
318+
func TestSelectServer_Secondary_with_maxStaleness_and_no_primary(t *testing.T) {
319+
require := require.New(t)
320+
subject := NewWithMaxStaleness(SecondaryMode, time.Duration(90)*time.Second)
321+
322+
result, err := SelectServer(subject, readPrefTestCluster, []*desc.Server{readPrefTestSecondary1, readPrefTestSecondary2})
323+
324+
require.NoError(err)
325+
require.Len(result, 1)
326+
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
327+
}
328+
202329
func TestSelectServer_Nearest(t *testing.T) {
203330
require := require.New(t)
204331
subject := New(NearestMode)
@@ -252,3 +379,69 @@ func TestSelectServer_Nearest_with_no_secondaries(t *testing.T) {
252379
require.Len(result, 1)
253380
require.Equal([]*desc.Server{readPrefTestPrimary}, result)
254381
}
382+
383+
func TestSelectServer_Nearest_with_maxStaleness(t *testing.T) {
384+
require := require.New(t)
385+
subject := NewWithMaxStaleness(NearestMode, time.Duration(90)*time.Second)
386+
387+
result, err := SelectServer(subject, readPrefTestCluster, readPrefTestCluster.Servers)
388+
389+
require.NoError(err)
390+
require.Len(result, 2)
391+
require.Equal([]*desc.Server{readPrefTestPrimary, readPrefTestSecondary2}, result)
392+
}
393+
394+
func TestSelectServer_Nearest_with_maxStaleness_and_no_primary(t *testing.T) {
395+
require := require.New(t)
396+
subject := NewWithMaxStaleness(NearestMode, time.Duration(90)*time.Second)
397+
398+
result, err := SelectServer(subject, readPrefTestCluster, []*desc.Server{readPrefTestSecondary1, readPrefTestSecondary2})
399+
400+
require.NoError(err)
401+
require.Len(result, 1)
402+
require.Equal([]*desc.Server{readPrefTestSecondary2}, result)
403+
}
404+
405+
func TestSelectServer_Max_staleness_is_less_than_90_seconds(t *testing.T) {
406+
require := require.New(t)
407+
subject := NewWithMaxStaleness(NearestMode, time.Duration(50)*time.Second)
408+
409+
server := &desc.Server{
410+
Endpoint: desc.Endpoint("localhost:27017"),
411+
HeartbeatInterval: time.Duration(10) * time.Second,
412+
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
413+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
414+
Type: desc.RSPrimary,
415+
Version: desc.NewVersion(3, 4, 0),
416+
}
417+
cluster := &desc.Cluster{
418+
Type: desc.ReplicaSetWithPrimary,
419+
Servers: []*desc.Server{server},
420+
}
421+
422+
_, err := SelectServer(subject, cluster, cluster.Servers)
423+
424+
require.Error(err)
425+
}
426+
427+
func TestSelectServer_Max_staleness_is_too_low(t *testing.T) {
428+
require := require.New(t)
429+
subject := NewWithMaxStaleness(NearestMode, time.Duration(100)*time.Second)
430+
431+
server := &desc.Server{
432+
Endpoint: desc.Endpoint("localhost:27017"),
433+
HeartbeatInterval: time.Duration(100) * time.Second,
434+
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
435+
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
436+
Type: desc.RSPrimary,
437+
Version: desc.NewVersion(3, 4, 0),
438+
}
439+
cluster := &desc.Cluster{
440+
Type: desc.ReplicaSetWithPrimary,
441+
Servers: []*desc.Server{server},
442+
}
443+
444+
_, err := SelectServer(subject, cluster, cluster.Servers)
445+
446+
require.Error(err)
447+
}

makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ PKGS = ./auth ./core ./core/connstring ./core/desc ./core/msg ./core/readpref ./
22
LINTARGS = -min_confidence="0.3"
33
TEST_TIMEOUT = 20
44

5-
default: test lint vet
5+
default: test-cover lint vet
66

77
doc:
88
godoc -http=:6060 -index

0 commit comments

Comments
 (0)