Skip to content

Commit dcdb499

Browse files
committed
Add controller workqueue option
Signed-off-by: Stefan Büringer [email protected]
1 parent 20f3f4b commit dcdb499

File tree

4 files changed

+80
-19
lines changed

4 files changed

+80
-19
lines changed

pkg/controller/controller.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ type Options struct {
5959
// The overall is a token bucket and the per-item is exponential.
6060
RateLimiter ratelimiter.RateLimiter
6161

62+
// NewWorkQueue constructs the queue for this controller once the controller is ready to start.
63+
// This is a func because the standard Kubernetes work queues start themselves immediately, which
64+
// leads to goroutine leaks if something calls controller.New repeatedly.
65+
// Defaults to NewRateLimitingQueueWithConfig.
66+
NewWorkQueue func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
67+
6268
// LogConstructor is used to construct a logger used for this controller and passed
6369
// to each reconciliation via the context field.
6470
LogConstructor func(request *reconcile.Request) logr.Logger
@@ -147,6 +153,14 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
147153
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
148154
}
149155

156+
if options.NewWorkQueue == nil {
157+
options.NewWorkQueue = func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
158+
return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
159+
Name: name,
160+
})
161+
}
162+
}
163+
150164
if options.RecoverPanic == nil {
151165
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
152166
}
@@ -157,12 +171,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
157171

158172
// Create controller with dependencies set
159173
return &controller.Controller{
160-
Do: options.Reconciler,
161-
MakeQueue: func() workqueue.RateLimitingInterface {
162-
return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
163-
Name: name,
164-
})
165-
},
174+
Do: options.Reconciler,
175+
RateLimiter: options.RateLimiter,
176+
NewWorkQueue: options.NewWorkQueue,
166177
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
167178
CacheSyncTimeout: options.CacheSyncTimeout,
168179
Name: name,

pkg/controller/controller_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
. "github.com/onsi/gomega"
2525
"go.uber.org/goleak"
2626
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/client-go/util/workqueue"
2728
"k8s.io/utils/ptr"
2829

2930
"sigs.k8s.io/controller-runtime/pkg/config"
@@ -32,6 +33,7 @@ import (
3233
"sigs.k8s.io/controller-runtime/pkg/handler"
3334
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
3435
"sigs.k8s.io/controller-runtime/pkg/manager"
36+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3537
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3638
"sigs.k8s.io/controller-runtime/pkg/source"
3739
)
@@ -133,6 +135,48 @@ var _ = Describe("controller.Controller", func() {
133135
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
134136
})
135137

138+
It("should default RateLimiter and NewWorkQueue if not specified", func() {
139+
m, err := manager.New(cfg, manager.Options{})
140+
Expect(err).NotTo(HaveOccurred())
141+
142+
c, err := controller.New("new-controller", m, controller.Options{
143+
Reconciler: reconcile.Func(nil),
144+
})
145+
Expect(err).NotTo(HaveOccurred())
146+
147+
ctrl, ok := c.(*internalcontroller.Controller)
148+
Expect(ok).To(BeTrue())
149+
150+
Expect(ctrl.RateLimiter).NotTo(BeNil())
151+
Expect(ctrl.NewWorkQueue).NotTo(BeNil())
152+
})
153+
154+
It("should not override RateLimiter and NewWorkQueue if specified", func() {
155+
m, err := manager.New(cfg, manager.Options{})
156+
Expect(err).NotTo(HaveOccurred())
157+
158+
customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second)
159+
customNewWorkQueueCalled := false
160+
customNewWorkQueue := func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
161+
customNewWorkQueueCalled = true
162+
return nil
163+
}
164+
165+
c, err := controller.New("new-controller", m, controller.Options{
166+
Reconciler: reconcile.Func(nil),
167+
RateLimiter: customRateLimiter,
168+
NewWorkQueue: customNewWorkQueue,
169+
})
170+
Expect(err).NotTo(HaveOccurred())
171+
172+
ctrl, ok := c.(*internalcontroller.Controller)
173+
Expect(ok).To(BeTrue())
174+
175+
Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter))
176+
ctrl.NewWorkQueue(nil)
177+
Expect(customNewWorkQueueCalled).To(BeTrue(), "Expected customNewWorkQueue to be called")
178+
})
179+
136180
It("should default RecoverPanic from the manager", func() {
137181
m, err := manager.New(cfg, manager.Options{Controller: config.Controller{RecoverPanic: ptr.To(true)}})
138182
Expect(err).NotTo(HaveOccurred())

pkg/internal/controller/controller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
3434
logf "sigs.k8s.io/controller-runtime/pkg/log"
3535
"sigs.k8s.io/controller-runtime/pkg/predicate"
36+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3637
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3738
"sigs.k8s.io/controller-runtime/pkg/source"
3839
)
@@ -50,10 +51,13 @@ type Controller struct {
5051
// Defaults to the DefaultReconcileFunc.
5152
Do reconcile.Reconciler
5253

53-
// MakeQueue constructs the queue for this controller once the controller is ready to start.
54-
// This exists because the standard Kubernetes workqueues start themselves immediately, which
54+
// RateLimiter is used to limit how frequently requests may be queued into the work queue.
55+
RateLimiter ratelimiter.RateLimiter
56+
57+
// NewWorkQueue constructs the queue for this controller once the controller is ready to start.
58+
// This is a func because the standard Kubernetes work queues start themselves immediately, which
5559
// leads to goroutine leaks if something calls controller.New repeatedly.
56-
MakeQueue func() workqueue.RateLimitingInterface
60+
NewWorkQueue func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
5761

5862
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
5963
// the Queue for processing
@@ -158,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error {
158162
// Set the internal context.
159163
c.ctx = ctx
160164

161-
c.Queue = c.MakeQueue()
165+
c.Queue = c.NewWorkQueue(c.RateLimiter)
162166
go func() {
163167
<-ctx.Done()
164168
c.Queue.ShutDown()

pkg/internal/controller/controller_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/apimachinery/pkg/types"
3535
"k8s.io/client-go/util/workqueue"
3636
"k8s.io/utils/ptr"
37+
3738
"sigs.k8s.io/controller-runtime/pkg/cache"
3839
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
3940
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -43,6 +44,7 @@ import (
4344
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
4445
"sigs.k8s.io/controller-runtime/pkg/internal/log"
4546
"sigs.k8s.io/controller-runtime/pkg/predicate"
47+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
4648
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4749
"sigs.k8s.io/controller-runtime/pkg/source"
4850
)
@@ -68,7 +70,7 @@ var _ = Describe("controller", func() {
6870
ctrl = &Controller{
6971
MaxConcurrentReconciles: 1,
7072
Do: fakeReconcile,
71-
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
73+
NewWorkQueue: func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue },
7274
LogConstructor: func(_ *reconcile.Request) logr.Logger {
7375
return log.RuntimeLog.WithName("controller").WithName("test")
7476
},
@@ -408,8 +410,8 @@ var _ = Describe("controller", func() {
408410
// TODO(directxman12): we should ensure that backoff occurrs with error requeue
409411

410412
It("should not reset backoff until there's a non-error result", func() {
411-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
412-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
413+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)}
414+
ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
413415

414416
ctx, cancel := context.WithCancel(context.Background())
415417
defer cancel()
@@ -444,8 +446,8 @@ var _ = Describe("controller", func() {
444446
})
445447

446448
It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() {
447-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
448-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
449+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)}
450+
ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
449451

450452
ctx, cancel := context.WithCancel(context.Background())
451453
defer cancel()
@@ -474,8 +476,8 @@ var _ = Describe("controller", func() {
474476
})
475477

476478
It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
477-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
478-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
479+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)}
480+
ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
479481

480482
ctx, cancel := context.WithCancel(context.Background())
481483
defer cancel()
@@ -504,8 +506,8 @@ var _ = Describe("controller", func() {
504506
})
505507

506508
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
507-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
508-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
509+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)}
510+
ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
509511

510512
ctx, cancel := context.WithCancel(context.Background())
511513
defer cancel()

0 commit comments

Comments
 (0)