Skip to content

Commit 9931919

Browse files
committed
Source: Move handler and predicate out from Start
1 parent 45e166d commit 9931919

File tree

15 files changed

+339
-380
lines changed

15 files changed

+339
-380
lines changed

examples/builtins/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ func main() {
5959
}
6060

6161
// Watch ReplicaSets and enqueue ReplicaSet object key
62-
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
62+
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
6363
entryLog.Error(err, "unable to watch ReplicaSets")
6464
os.Exit(1)
6565
}
6666

6767
// Watch Pods and enqueue owning ReplicaSet key
68-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
69-
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
68+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
69+
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
7070
entryLog.Error(err, "unable to watch Pods")
7171
os.Exit(1)
7272
}

pkg/builder/controller.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
124124
// WatchesInput represents the information set by Watches method.
125125
type WatchesInput struct {
126126
src source.Source
127-
eventHandler handler.EventHandler
128127
predicates []predicate.Predicate
129128
objectProjection objectProjection
130129
}
@@ -135,8 +134,13 @@ type WatchesInput struct {
135134
// This is the equivalent of calling
136135
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
137136
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
138-
src := source.Kind(blder.mgr.GetCache(), object)
139-
return blder.WatchesRawSource(src, eventHandler, opts...)
137+
input := WatchesInput{}
138+
for _, opt := range opts {
139+
opt.ApplyToWatches(&input)
140+
}
141+
src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...)
142+
143+
return blder.WatchesRawSource(src, opts...)
140144
}
141145

142146
// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
@@ -176,8 +180,8 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
176180
//
177181
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
178182
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
179-
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
180-
input := WatchesInput{src: src, eventHandler: eventHandler}
183+
func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder {
184+
input := WatchesInput{src: src}
181185
for _, opt := range opts {
182186
opt.ApplyToWatches(&input)
183187
}
@@ -272,11 +276,11 @@ func (blder *Builder) doWatch() error {
272276
if err != nil {
273277
return err
274278
}
275-
src := source.Kind(blder.mgr.GetCache(), obj)
276279
hdler := &handler.EnqueueRequestForObject{}
277280
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
278281
allPredicates = append(allPredicates, blder.forInput.predicates...)
279-
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
282+
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
283+
if err := blder.ctrl.Watch(src); err != nil {
280284
return err
281285
}
282286
}
@@ -290,7 +294,6 @@ func (blder *Builder) doWatch() error {
290294
if err != nil {
291295
return err
292296
}
293-
src := source.Kind(blder.mgr.GetCache(), obj)
294297
opts := []handler.OwnerOption{}
295298
if !own.matchEveryOwner {
296299
opts = append(opts, handler.OnlyControllerOwner())
@@ -302,7 +305,8 @@ func (blder *Builder) doWatch() error {
302305
)
303306
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
304307
allPredicates = append(allPredicates, own.predicates...)
305-
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
308+
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
309+
if err := blder.ctrl.Watch(src); err != nil {
306310
return err
307311
}
308312
}
@@ -311,18 +315,19 @@ func (blder *Builder) doWatch() error {
311315
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
312316
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
313317
}
318+
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
314319
for _, w := range blder.watchesInput {
315320
// If the source of this watch is of type Kind, project it.
316321
if srcKind, ok := w.src.(*internalsource.Kind); ok {
322+
allPredicates := append(allPredicates, w.predicates...)
317323
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
318324
if err != nil {
319325
return err
320326
}
321327
srcKind.Type = typeForSrc
328+
srcKind.Predicates = append(srcKind.Predicates, allPredicates...)
322329
}
323-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
324-
allPredicates = append(allPredicates, w.predicates...)
325-
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
330+
if err := blder.ctrl.Watch(w.src); err != nil {
326331
return err
327332
}
328333
}

pkg/controller/controller.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28-
"sigs.k8s.io/controller-runtime/pkg/handler"
2928
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
3029
"sigs.k8s.io/controller-runtime/pkg/manager"
31-
"sigs.k8s.io/controller-runtime/pkg/predicate"
3230
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3331
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3432
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -84,13 +82,8 @@ type Controller interface {
8482
// Reconciler is called to reconcile an object by Namespace/Name
8583
reconcile.Reconciler
8684

87-
// Watch takes events provided by a Source and uses the EventHandler to
88-
// enqueue reconcile.Requests in response to the events.
89-
//
90-
// Watch may be provided one or more Predicates to filter events before
91-
// they are given to the EventHandler. Events will be passed to the
92-
// EventHandler if all provided Predicates evaluate to true.
93-
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
85+
// Watch watches the provided Source.
86+
Watch(src source.Source) error
9487

9588
// Start starts the controller. Start blocks until the context is closed or a
9689
// controller has an error starting.

pkg/controller/controller_integration_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,13 @@ var _ = Describe("controller", func() {
6565

6666
By("Watching Resources")
6767
err = instance.Watch(
68-
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
69-
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
68+
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
69+
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
70+
),
7071
)
7172
Expect(err).NotTo(HaveOccurred())
7273

73-
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
74+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
7475
Expect(err).NotTo(HaveOccurred())
7576

7677
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})

pkg/controller/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {
7979

8080
ctx, cancel := context.WithCancel(context.Background())
8181
watchChan := make(chan event.GenericEvent, 1)
82-
watch := &source.Channel{Source: watchChan}
82+
watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}}
8383
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8484

8585
reconcileStarted := make(chan struct{})
@@ -101,7 +101,7 @@ var _ = Describe("controller.Controller", func() {
101101
Expect(err).NotTo(HaveOccurred())
102102

103103
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
104-
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
104+
Expect(c.Watch(watch)).To(Succeed())
105105
Expect(err).NotTo(HaveOccurred())
106106

107107
go func() {

pkg/controller/example_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func ExampleController() {
7171
}
7272

7373
// Watch for Pod create / update / delete events and call Reconcile
74-
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
74+
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
7575
if err != nil {
7676
log.Error(err, "unable to watch pods")
7777
os.Exit(1)
@@ -108,7 +108,7 @@ func ExampleController_unstructured() {
108108
Version: "v1",
109109
})
110110
// Watch for Pod create / update / delete events and call Reconcile
111-
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
111+
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
112112
if err != nil {
113113
log.Error(err, "unable to watch pods")
114114
os.Exit(1)
@@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
139139
os.Exit(1)
140140
}
141141

142-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
142+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
143143
log.Error(err, "unable to watch pods")
144144
os.Exit(1)
145145
}

pkg/handler/example_test.go

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ var (
4242
func ExampleEnqueueRequestForObject() {
4343
// controller is a controller.controller
4444
err := c.Watch(
45-
source.Kind(mgr.GetCache(), &corev1.Pod{}),
46-
&handler.EnqueueRequestForObject{},
45+
source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}),
4746
)
4847
if err != nil {
4948
// handle it
@@ -55,8 +54,9 @@ func ExampleEnqueueRequestForObject() {
5554
func ExampleEnqueueRequestForOwner() {
5655
// controller is a controller.controller
5756
err := c.Watch(
58-
source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}),
59-
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
57+
source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{},
58+
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
59+
),
6060
)
6161
if err != nil {
6262
// handle it
@@ -68,19 +68,20 @@ func ExampleEnqueueRequestForOwner() {
6868
func ExampleEnqueueRequestsFromMapFunc() {
6969
// controller is a controller.controller
7070
err := c.Watch(
71-
source.Kind(mgr.GetCache(), &appsv1.Deployment{}),
72-
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
73-
return []reconcile.Request{
74-
{NamespacedName: types.NamespacedName{
75-
Name: a.GetName() + "-1",
76-
Namespace: a.GetNamespace(),
77-
}},
78-
{NamespacedName: types.NamespacedName{
79-
Name: a.GetName() + "-2",
80-
Namespace: a.GetNamespace(),
81-
}},
82-
}
83-
}),
71+
source.Kind(mgr.GetCache(), &appsv1.Deployment{},
72+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
73+
return []reconcile.Request{
74+
{NamespacedName: types.NamespacedName{
75+
Name: a.GetName() + "-1",
76+
Namespace: a.GetNamespace(),
77+
}},
78+
{NamespacedName: types.NamespacedName{
79+
Name: a.GetName() + "-2",
80+
Namespace: a.GetNamespace(),
81+
}},
82+
}
83+
}),
84+
),
8485
)
8586
if err != nil {
8687
// handle it
@@ -91,33 +92,34 @@ func ExampleEnqueueRequestsFromMapFunc() {
9192
func ExampleFuncs() {
9293
// controller is a controller.controller
9394
err := c.Watch(
94-
source.Kind(mgr.GetCache(), &corev1.Pod{}),
95-
handler.Funcs{
96-
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
97-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
98-
Name: e.Object.GetName(),
99-
Namespace: e.Object.GetNamespace(),
100-
}})
95+
source.Kind(mgr.GetCache(), &corev1.Pod{},
96+
handler.Funcs{
97+
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
98+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
99+
Name: e.Object.GetName(),
100+
Namespace: e.Object.GetNamespace(),
101+
}})
102+
},
103+
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
104+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
105+
Name: e.ObjectNew.GetName(),
106+
Namespace: e.ObjectNew.GetNamespace(),
107+
}})
108+
},
109+
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
110+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
111+
Name: e.Object.GetName(),
112+
Namespace: e.Object.GetNamespace(),
113+
}})
114+
},
115+
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
116+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
117+
Name: e.Object.GetName(),
118+
Namespace: e.Object.GetNamespace(),
119+
}})
120+
},
101121
},
102-
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
103-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
104-
Name: e.ObjectNew.GetName(),
105-
Namespace: e.ObjectNew.GetNamespace(),
106-
}})
107-
},
108-
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
109-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
110-
Name: e.Object.GetName(),
111-
Namespace: e.Object.GetNamespace(),
112-
}})
113-
},
114-
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
115-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
116-
Name: e.Object.GetName(),
117-
Namespace: e.Object.GetNamespace(),
118-
}})
119-
},
120-
},
122+
),
121123
)
122124
if err != nil {
123125
// handle it

pkg/internal/controller/controller.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@ import (
2929
"k8s.io/apimachinery/pkg/util/uuid"
3030
"k8s.io/client-go/util/workqueue"
3131

32-
"sigs.k8s.io/controller-runtime/pkg/handler"
3332
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
3433
logf "sigs.k8s.io/controller-runtime/pkg/log"
35-
"sigs.k8s.io/controller-runtime/pkg/predicate"
3634
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3735
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3836
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -98,9 +96,7 @@ type Controller struct {
9896

9997
// watchDescription contains all the information necessary to start a watch.
10098
type watchDescription struct {
101-
src source.Source
102-
handler handler.EventHandler
103-
predicates []predicate.Predicate
99+
src source.Source
104100
}
105101

106102
// Reconcile implements reconcile.Reconciler.
@@ -124,20 +120,20 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re
124120
}
125121

126122
// Watch implements controller.Controller.
127-
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
123+
func (c *Controller) Watch(src source.Source) error {
128124
c.mu.Lock()
129125
defer c.mu.Unlock()
130126

131127
// Controller hasn't started yet, store the watches locally and return.
132128
//
133129
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
134130
if !c.Started {
135-
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
131+
c.startWatches = append(c.startWatches, watchDescription{src: src})
136132
return nil
137133
}
138134

139135
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
140-
return src.Start(c.ctx, evthdler, c.Queue, prct...)
136+
return src.Start(c.ctx, c.Queue)
141137
}
142138

143139
// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
@@ -181,7 +177,7 @@ func (c *Controller) Start(ctx context.Context) error {
181177
for _, watch := range c.startWatches {
182178
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
183179

184-
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
180+
if err := watch.src.Start(ctx, c.Queue); err != nil {
185181
return err
186182
}
187183
}

0 commit comments

Comments
 (0)