Skip to content

Commit 761e03e

Browse files
committed
update internal controller for more clean code and metrics
1 parent 38483b2 commit 761e03e

File tree

1 file changed

+20
-14
lines changed

1 file changed

+20
-14
lines changed

pkg/internal/controller/controller.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
154154
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
155155
for i := 0; i < c.MaxConcurrentReconciles; i++ {
156156
// Process work items
157-
go wait.Until(func() {
158-
for c.processNextWorkItem() {
159-
}
160-
}, c.JitterPeriod, stop)
157+
go wait.Until(c.worker, c.JitterPeriod, stop)
161158
}
162159

163160
c.Started = true
@@ -168,17 +165,16 @@ func (c *Controller) Start(stop <-chan struct{}) error {
168165
return nil
169166
}
170167

168+
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
169+
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
170+
func (c *Controller) worker() {
171+
for c.processNextWorkItem() {
172+
}
173+
}
174+
171175
// processNextWorkItem will read a single work item off the workqueue and
172-
// attempt to process it, by calling the syncHandler.
176+
// attempt to process it, by calling the reconcileHandler.
173177
func (c *Controller) processNextWorkItem() bool {
174-
// This code copy-pasted from the sample-Controller.
175-
176-
// Update metrics after processing each item
177-
reconcileStartTS := time.Now()
178-
defer func() {
179-
c.updateMetrics(time.Now().Sub(reconcileStartTS))
180-
}()
181-
182178
obj, shutdown := c.Queue.Get()
183179
if shutdown {
184180
// Stop working
@@ -192,6 +188,17 @@ func (c *Controller) processNextWorkItem() bool {
192188
// put back on the workqueue and attempted again after a back-off
193189
// period.
194190
defer c.Queue.Done(obj)
191+
192+
return c.reconcileHandler(obj)
193+
}
194+
195+
func (c *Controller) reconcileHandler(obj interface{}) bool {
196+
// Update metrics after processing each item
197+
reconcileStartTS := time.Now()
198+
defer func() {
199+
c.updateMetrics(time.Now().Sub(reconcileStartTS))
200+
}()
201+
195202
var req reconcile.Request
196203
var ok bool
197204
if req, ok = obj.(reconcile.Request); !ok {
@@ -204,7 +211,6 @@ func (c *Controller) processNextWorkItem() bool {
204211
// Return true, don't take a break
205212
return true
206213
}
207-
208214
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
209215
// resource to be synced.
210216
if result, err := c.Do.Reconcile(req); err != nil {

0 commit comments

Comments
 (0)