@@ -130,6 +130,10 @@ func (c *Controller) Start(stop <-chan struct{}) error {
130
130
c .Queue = c .MakeQueue ()
131
131
defer c .Queue .ShutDown () // needs to be outside the iife so that we shutdown after the stop channel is closed
132
132
133
+ // TODO: Propagate context from the Runnable interface, when we're ready to change the signature.
134
+ ctx , cancel := context .WithCancel (context .TODO ())
135
+ defer cancel ()
136
+
133
137
err := func () error {
134
138
defer c .mu .Unlock ()
135
139
@@ -170,8 +174,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
170
174
// Launch workers to process resources
171
175
c .Log .Info ("Starting workers" , "worker count" , c .MaxConcurrentReconciles )
172
176
for i := 0 ; i < c .MaxConcurrentReconciles ; i ++ {
173
- // Process work items
174
- go wait .Until (c .worker , c .JitterPeriod , stop )
177
+ // Run a worker thread that just dequeues items, processes them, and marks them done.
178
+ // It enforces that the reconcileHandler is never invoked concurrently with the same object.
179
+ go wait .Until (func () {
180
+ for c .processNextWorkItem (ctx ) {
181
+ }
182
+ }, c .JitterPeriod , stop )
175
183
}
176
184
177
185
c .Started = true
@@ -180,22 +188,16 @@ func (c *Controller) Start(stop <-chan struct{}) error {
180
188
if err != nil {
181
189
return err
182
190
}
191
+ context .WithCancel (context .Background ())
183
192
184
193
<- stop
185
194
c .Log .Info ("Stopping workers" )
186
195
return nil
187
196
}
188
197
189
- // worker runs a worker thread that just dequeues items, processes them, and marks them done.
190
- // It enforces that the reconcileHandler is never invoked concurrently with the same object.
191
- func (c * Controller ) worker () {
192
- for c .processNextWorkItem () {
193
- }
194
- }
195
-
196
198
// processNextWorkItem will read a single work item off the workqueue and
197
199
// attempt to process it, by calling the reconcileHandler.
198
- func (c * Controller ) processNextWorkItem () bool {
200
+ func (c * Controller ) processNextWorkItem (ctx context. Context ) bool {
199
201
obj , shutdown := c .Queue .Get ()
200
202
if shutdown {
201
203
// Stop working
@@ -210,10 +212,10 @@ func (c *Controller) processNextWorkItem() bool {
210
212
// period.
211
213
defer c .Queue .Done (obj )
212
214
213
- return c .reconcileHandler (obj )
215
+ return c .reconcileHandler (ctx , obj )
214
216
}
215
217
216
- func (c * Controller ) reconcileHandler (obj interface {}) bool {
218
+ func (c * Controller ) reconcileHandler (ctx context. Context , obj interface {}) bool {
217
219
// Update metrics after processing each item
218
220
reconcileStartTS := time .Now ()
219
221
defer func () {
@@ -233,7 +235,7 @@ func (c *Controller) reconcileHandler(obj interface{}) bool {
233
235
}
234
236
235
237
log := c .Log .WithValues ("name" , req .Name , "namespace" , req .Namespace )
236
- ctx : = logf .IntoContext (context . Background () , log )
238
+ ctx = logf .IntoContext (ctx , log )
237
239
238
240
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
239
241
// resource to be synced.
0 commit comments