@@ -102,6 +102,13 @@ type controller struct {
102
102
// mu is used to synchronize controller setup
103
103
mu sync.Mutex
104
104
105
+ // jitterPeriod allows tests to reduce the jitterPeriod so they complete faster
106
+ jitterPeriod time.Duration
107
+
108
+ // waitForCache allows tests to mock out the waitForCache function to return an error
109
+ // defaults to cache.WaitForCacheSync
110
+ waitForCache func (stopCh <- chan struct {}, cacheSyncs ... cache.InformerSynced ) bool
111
+
105
112
// TODO(pwittrock): Consider initializing a logger with the controller name as the tag
106
113
}
107
114
@@ -145,21 +152,30 @@ func (c *controller) Start(stop <-chan struct{}) error {
145
152
for _ , informer := range allInformers {
146
153
syncedFuncs = append (syncedFuncs , informer .HasSynced )
147
154
}
148
- if ok := cache .WaitForCacheSync (stop , syncedFuncs ... ); ! ok {
155
+
156
+ if c .waitForCache == nil {
157
+ c .waitForCache = cache .WaitForCacheSync
158
+ }
159
+ if ok := c .waitForCache (stop , syncedFuncs ... ); ! ok {
160
+ // This code is unreachable right now since WaitForCacheSync will never return an error
161
+ // Leaving it here because that could happen in the future
149
162
err := fmt .Errorf ("failed to wait for %s caches to sync" , c .name )
150
163
log .Error (err , "Could not wait for Cache to sync" , "controller" , c .name )
151
164
return err
152
165
}
153
166
167
+ if c .jitterPeriod == 0 {
168
+ c .jitterPeriod = time .Second
169
+ }
170
+
154
171
// Launch two workers to process resources
155
172
log .Info ("Starting workers" , "controller" , c .name , "WorkerCount" , c .maxConcurrentReconciles )
156
173
for i := 0 ; i < c .maxConcurrentReconciles ; i ++ {
157
- // Continually process work items
174
+ // Process work items
158
175
go wait .Until (func () {
159
- // TODO(pwittrock): Should we really use wait.Until to continuously restart this if it exits?
160
176
for c .processNextWorkItem () {
161
177
}
162
- }, time . Second , stop )
178
+ }, c . jitterPeriod , stop )
163
179
}
164
180
165
181
<- stop
@@ -174,13 +190,12 @@ func (c *controller) processNextWorkItem() bool {
174
190
175
191
obj , shutdown := c .queue .Get ()
176
192
if obj == nil {
177
- log . Error ( nil , "Encountered nil Request" , "Object" , obj )
193
+ // Sometimes the queue gives us nil items when it starts up
178
194
c .queue .Forget (obj )
179
195
}
180
196
181
197
if shutdown {
182
- // Return false, this will cause the controller to back off for a second before trying again.
183
- // TODO(community): This was copied from the sample-controller. Figure out why / if we need this.
198
+ // Stop working
184
199
return false
185
200
}
186
201
@@ -210,8 +225,6 @@ func (c *controller) processNextWorkItem() bool {
210
225
c .queue .AddRateLimited (req )
211
226
log .Error (nil , "reconcile error" , "controller" , c .name , "Request" , req )
212
227
213
- // Return false, this will cause the controller to back off for a second before trying again.
214
- // TODO(community): This was copied from the sample-controller. Figure out why / if we need this.
215
228
return false
216
229
} else if result .Requeue {
217
230
c .queue .AddRateLimited (req )
0 commit comments