Skip to content

Commit 4b59df8

Browse files
committed
🌱 Propagate context from controller.Start to handler
Signed-off-by: Vince Prignano <[email protected]>
1 parent 420cd15 commit 4b59df8

File tree

1 file changed

+14
-13
lines changed

1 file changed

+14
-13
lines changed

pkg/internal/controller/controller.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ func (c *Controller) Start(stop <-chan struct{}) error {
130130
c.Queue = c.MakeQueue()
131131
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
132132

133+
// TODO: Propagate context from the Runnable interface, when we're ready to change the signature.
134+
ctx, cancel := context.WithCancel(context.TODO())
135+
defer cancel()
136+
133137
err := func() error {
134138
defer c.mu.Unlock()
135139

@@ -170,8 +174,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
170174
// Launch workers to process resources
171175
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
172176
for i := 0; i < c.MaxConcurrentReconciles; i++ {
173-
// Process work items
174-
go wait.Until(c.worker, c.JitterPeriod, stop)
177+
go wait.UntilWithContext(ctx, func(ctx context.Context) {
178+
// Run a worker thread that just dequeues items, processes them, and marks them done.
179+
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
180+
for c.processNextWorkItem(ctx) {
181+
}
182+
}, c.JitterPeriod)
175183
}
176184

177185
c.Started = true
@@ -186,16 +194,9 @@ func (c *Controller) Start(stop <-chan struct{}) error {
186194
return nil
187195
}
188196

189-
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
190-
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
191-
func (c *Controller) worker() {
192-
for c.processNextWorkItem() {
193-
}
194-
}
195-
196197
// processNextWorkItem will read a single work item off the workqueue and
197198
// attempt to process it, by calling the reconcileHandler.
198-
func (c *Controller) processNextWorkItem() bool {
199+
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
199200
obj, shutdown := c.Queue.Get()
200201
if shutdown {
201202
// Stop working
@@ -210,10 +211,10 @@ func (c *Controller) processNextWorkItem() bool {
210211
// period.
211212
defer c.Queue.Done(obj)
212213

213-
return c.reconcileHandler(obj)
214+
return c.reconcileHandler(ctx, obj)
214215
}
215216

216-
func (c *Controller) reconcileHandler(obj interface{}) bool {
217+
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) bool {
217218
// Update metrics after processing each item
218219
reconcileStartTS := time.Now()
219220
defer func() {
@@ -233,7 +234,7 @@ func (c *Controller) reconcileHandler(obj interface{}) bool {
233234
}
234235

235236
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
236-
ctx := logf.IntoContext(context.Background(), log)
237+
ctx = logf.IntoContext(ctx, log)
237238

238239
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
239240
// resource to be synced.

0 commit comments

Comments
 (0)