@@ -154,10 +154,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
154
154
log .Info ("Starting workers" , "controller" , c .Name , "worker count" , c .MaxConcurrentReconciles )
155
155
for i := 0 ; i < c .MaxConcurrentReconciles ; i ++ {
156
156
// Process work items
157
- go wait .Until (func () {
158
- for c .processNextWorkItem () {
159
- }
160
- }, c .JitterPeriod , stop )
157
+ go wait .Until (c .worker , c .JitterPeriod , stop )
161
158
}
162
159
163
160
c .Started = true
@@ -168,30 +165,32 @@ func (c *Controller) Start(stop <-chan struct{}) error {
168
165
return nil
169
166
}
170
167
168
+ // worker runs a worker thread that just dequeues items, processes them, and marks them done.
169
+ // It enforces that the reconcileHandler is never invoked concurrently with the same object.
170
+ func (c * Controller ) worker () {
171
+ for c .processNextWorkItem () {
172
+ }
173
+ }
174
+
171
175
// processNextWorkItem will read a single work item off the workqueue and
172
- // attempt to process it, by calling the syncHandler .
176
+ // attempt to process it, by calling the reconcileHandler .
173
177
func (c * Controller ) processNextWorkItem () bool {
174
- // This code copy-pasted from the sample-Controller.
178
+ obj , shutdown := c .Queue .Get ()
179
+ if shutdown {
180
+ return false
181
+ }
182
+ defer c .Queue .Done (obj )
175
183
184
+ return c .reconcileHandler (obj )
185
+ }
186
+
187
+ func (c * Controller ) reconcileHandler (obj interface {}) bool {
176
188
// Update metrics after processing each item
177
189
reconcileStartTS := time .Now ()
178
190
defer func () {
179
191
c .updateMetrics (time .Now ().Sub (reconcileStartTS ))
180
192
}()
181
193
182
- obj , shutdown := c .Queue .Get ()
183
- if shutdown {
184
- // Stop working
185
- return false
186
- }
187
-
188
- // We call Done here so the workqueue knows we have finished
189
- // processing this item. We also must remember to call Forget if we
190
- // do not want this work item being re-queued. For example, we do
191
- // not call Forget if a transient error occurs, instead the item is
192
- // put back on the workqueue and attempted again after a back-off
193
- // period.
194
- defer c .Queue .Done (obj )
195
194
var req reconcile.Request
196
195
var ok bool
197
196
if req , ok = obj .(reconcile.Request ); ! ok {
@@ -204,7 +203,6 @@ func (c *Controller) processNextWorkItem() bool {
204
203
// Return true, don't take a break
205
204
return true
206
205
}
207
-
208
206
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
209
207
// resource to be synced.
210
208
if result , err := c .Do .Reconcile (req ); err != nil {
0 commit comments