Skip to content

Commit 658ff98

Browse files
Make workqueue rate limiter configurable during controller creation
Exposes rate limiter as a controller option in case users want to use a different rate limiter than the default one provided by controller-runtime.
1 parent efbeb27 commit 658ff98

File tree

4 files changed

+70
-1
lines changed

4 files changed

+70
-1
lines changed

pkg/builder/controller_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/runtime/schema"
3232
"k8s.io/apimachinery/pkg/types"
33+
"k8s.io/client-go/util/workqueue"
3334
"sigs.k8s.io/controller-runtime/pkg/controller"
3435
"sigs.k8s.io/controller-runtime/pkg/handler"
3536
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -126,6 +127,28 @@ var _ = Describe("application", func() {
126127
Expect(instance).NotTo(BeNil())
127128
})
128129

130+
It("should override rate limiter during creation of controller", func() {
131+
rateLimiter := workqueue.DefaultItemBasedRateLimiter()
132+
newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) {
133+
if options.RateLimiter == rateLimiter {
134+
return controller.New(name, mgr, options)
135+
}
136+
return nil, fmt.Errorf("rate limiter expected %T but found %T", rateLimiter, options.RateLimiter)
137+
}
138+
139+
By("creating a controller manager")
140+
m, err := manager.New(cfg, manager.Options{})
141+
Expect(err).NotTo(HaveOccurred())
142+
143+
instance, err := ControllerManagedBy(m).
144+
For(&appsv1.ReplicaSet{}).
145+
Owns(&appsv1.ReplicaSet{}).
146+
WithOptions(controller.Options{RateLimiter: rateLimiter}).
147+
Build(noop)
148+
Expect(err).NotTo(HaveOccurred())
149+
Expect(instance).NotTo(BeNil())
150+
})
151+
129152
It("should allow multiple controllers for the same kind", func() {
130153
By("creating a controller manager")
131154
m, err := manager.New(cfg, manager.Options{})

pkg/controller/controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
2525
"sigs.k8s.io/controller-runtime/pkg/manager"
2626
"sigs.k8s.io/controller-runtime/pkg/predicate"
27+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
2728
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2829
"sigs.k8s.io/controller-runtime/pkg/source"
2930
)
@@ -35,6 +36,11 @@ type Options struct {
3536

3637
// Reconciler reconciles an object
3738
Reconciler reconcile.Reconciler
39+
40+
// RateLimiter is used to limit how frequently requests maybe queued.
41+
// Defaults to MaxOfRateLimiter which has both overall and per-item rate limiting.
42+
// The overall is a token bucket and the per-item is exponential.
43+
RateLimiter ratelimiter.RateLimiter
3844
}
3945

4046
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -73,6 +79,10 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
7379
options.MaxConcurrentReconciles = 1
7480
}
7581

82+
if options.RateLimiter == nil {
83+
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
84+
}
85+
7686
// Inject dependencies into Reconciler
7787
if err := mgr.SetFields(options.Reconciler); err != nil {
7888
return nil, err
@@ -87,7 +97,7 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8797
Client: mgr.GetClient(),
8898
Recorder: mgr.GetEventRecorderFor(name),
8999
MakeQueue: func() workqueue.RateLimitingInterface {
90-
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
100+
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
91101
},
92102
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
93103
Name: name,

pkg/ratelimiter/doc.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
Package ratelimiter defines rate limiters used by Controllers to limit how frequently requests may be queued.
19+
20+
Typical rate limiters that can be used are implemented in client-go's workqueue package.
21+
*/
22+
package ratelimiter

pkg/ratelimiter/ratelimiter.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package ratelimiter
2+
3+
import "time"
4+
5+
// RateLimiter is an identical interface of client-go workqueue RateLimiter.
6+
type RateLimiter interface {
7+
// When gets an item and gets to decide how long that item should wait
8+
When(item interface{}) time.Duration
9+
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
10+
// or for success, we'll stop tracking it
11+
Forget(item interface{})
12+
// NumRequeues returns back how many failures the item has had
13+
NumRequeues(item interface{}) int
14+
}

0 commit comments

Comments
 (0)