@@ -51,8 +51,8 @@ type FakeSourceSyncer struct {
51
51
History map [registry.CatalogKey ][]connectivity.State
52
52
53
53
sync.Mutex
54
- expectedEvents int
55
- done chan struct {}
54
+ expectedReadies int
55
+ done chan struct {}
56
56
}
57
57
58
58
func (f * FakeSourceSyncer ) sync (state SourceState ) {
@@ -61,18 +61,20 @@ func (f *FakeSourceSyncer) sync(state SourceState) {
61
61
f .History [state .Key ] = []connectivity.State {}
62
62
}
63
63
f .History [state .Key ] = append (f .History [state .Key ], state .State )
64
- f .expectedEvents -= 1
65
- if f .expectedEvents == 0 {
64
+ if state .State == connectivity .Ready {
65
+ f .expectedReadies -= 1
66
+ }
67
+ if f .expectedReadies == 0 {
66
68
f .done <- struct {}{}
67
69
}
68
70
f .Unlock ()
69
71
}
70
72
71
- func NewFakeSourceSyncer (expectedEvents int ) * FakeSourceSyncer {
73
+ func NewFakeSourceSyncer (expectedReadies int ) * FakeSourceSyncer {
72
74
return & FakeSourceSyncer {
73
- History : map [registry.CatalogKey ][]connectivity.State {},
74
- expectedEvents : expectedEvents ,
75
- done : make (chan struct {}),
75
+ History : map [registry.CatalogKey ][]connectivity.State {},
76
+ expectedReadies : expectedReadies ,
77
+ done : make (chan struct {}),
76
78
}
77
79
}
78
80
@@ -85,21 +87,19 @@ func TestConnectionEvents(t *testing.T) {
85
87
test := func (tt testcase ) func (t * testing.T ) {
86
88
return func (t * testing.T ) {
87
89
// start server for each catalog
88
- totalEvents := 0
89
90
addresses := map [registry.CatalogKey ]string {}
90
91
91
- for catalog , events := range tt .expectedHistory {
92
- totalEvents += len (events )
92
+ for catalog , _ := range tt .expectedHistory {
93
93
serve , address , stop := server (& fakes.FakeQuery {})
94
94
addresses [catalog ] = address
95
95
go serve ()
96
96
defer stop ()
97
97
}
98
98
99
99
// start source manager
100
- syncer := NewFakeSourceSyncer (totalEvents )
100
+ syncer := NewFakeSourceSyncer (len ( tt . expectedHistory ) )
101
101
sources := NewSourceStore (logrus .New (), 1 * time .Second , 5 * time .Second , syncer .sync )
102
- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
102
+ ctx , cancel := context .WithTimeout (context .Background (), 20 * time .Second )
103
103
defer cancel ()
104
104
sources .Start (ctx )
105
105
@@ -116,7 +116,13 @@ func TestConnectionEvents(t *testing.T) {
116
116
for catalog , events := range tt .expectedHistory {
117
117
recordedEvents := syncer .History [catalog ]
118
118
for i := 0 ; i < len (recordedEvents ); i ++ {
119
- require .Equal (t , (events [i ]).String (), (recordedEvents [i ]).String ())
119
+ found := false
120
+ for _ , event := range events {
121
+ if event .String () == recordedEvents [i ].String () {
122
+ found = true
123
+ }
124
+ }
125
+ require .True (t , found )
120
126
}
121
127
}
122
128
}
0 commit comments