@@ -93,6 +93,8 @@ type controller struct {
93
93
// the queue for processing
94
94
queue workqueue.RateLimitingInterface
95
95
96
+ jitterPeriod time.Duration
97
+
96
98
// once ensures unspecified fields get default values
97
99
once sync.Once
98
100
@@ -102,6 +104,8 @@ type controller struct {
102
104
// mu is used to synchronize controller setup
103
105
mu sync.Mutex
104
106
107
+ waitForCache func (stopCh <- chan struct {}, cacheSyncs ... cache.InformerSynced ) bool
108
+
105
109
// TODO(pwittrock): Consider initializing a logger with the controller name as the tag
106
110
}
107
111
@@ -145,21 +149,33 @@ func (c *controller) Start(stop <-chan struct{}) error {
145
149
for _ , informer := range allInformers {
146
150
syncedFuncs = append (syncedFuncs , informer .HasSynced )
147
151
}
148
- if ok := cache .WaitForCacheSync (stop , syncedFuncs ... ); ! ok {
152
+
153
+ // Allow waitForCache to be mocked out for tests
154
+ if c .waitForCache == nil {
155
+ c .waitForCache = cache .WaitForCacheSync
156
+ }
157
+ if ok := c .waitForCache (stop , syncedFuncs ... ); ! ok {
158
+ // This code is unreachable right now since WaitForCacheSync will never return an error
159
+ // Leaving it here because that could happen in the future
149
160
err := fmt .Errorf ("failed to wait for %s caches to sync" , c .name )
150
161
log .Error (err , "Could not wait for Cache to sync" , "controller" , c .name )
151
162
return err
152
163
}
153
164
165
+ if c .jitterPeriod == 0 {
166
+ c .jitterPeriod = time .Second
167
+ }
168
+
154
169
// Launch two workers to process resources
155
170
log .Info ("Starting workers" , "controller" , c .name , "WorkerCount" , c .maxConcurrentReconciles )
156
171
for i := 0 ; i < c .maxConcurrentReconciles ; i ++ {
157
- // Continually process work items
172
+ // Process work items
158
173
go wait .Until (func () {
159
- // TODO(pwittrock): Should we really use wait.Until to continuously restart this if it exits?
174
+ fmt . Printf ( "Running \n " )
160
175
for c .processNextWorkItem () {
161
176
}
162
- }, time .Second , stop )
177
+ fmt .Printf ("Stopping\n " )
178
+ }, c .jitterPeriod , stop )
163
179
}
164
180
165
181
<- stop
@@ -174,13 +190,12 @@ func (c *controller) processNextWorkItem() bool {
174
190
175
191
obj , shutdown := c .queue .Get ()
176
192
if obj == nil {
177
- log . Error ( nil , "Encountered nil Request" , "Object" , obj )
193
+ // Sometimes the queue gives us nil items when it starts up
178
194
c .queue .Forget (obj )
179
195
}
180
196
181
197
if shutdown {
182
- // Return false, this will cause the controller to back off for a second before trying again.
183
- // TODO(community): This was copied from the sample-controller. Figure out why / if we need this.
198
+ // Stop working
184
199
return false
185
200
}
186
201
@@ -210,8 +225,6 @@ func (c *controller) processNextWorkItem() bool {
210
225
c .queue .AddRateLimited (req )
211
226
log .Error (nil , "reconcile error" , "controller" , c .name , "Request" , req )
212
227
213
- // Return false, this will cause the controller to back off for a second before trying again.
214
- // TODO(community): This was copied from the sample-controller. Figure out why / if we need this.
215
228
return false
216
229
} else if result .Requeue {
217
230
c .queue .AddRateLimited (req )
0 commit comments