Skip to content

Commit 71edbc4

Browse files
authored
Merge pull request #799 from mszostok/watches-v2
⚠️ Add predicates as variadic args for Owns, For, and Watches func
2 parents 19fe96c + 7e2196f commit 71edbc4

File tree

3 files changed

+198
-27
lines changed

3 files changed

+198
-27
lines changed

pkg/builder/controller.go

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ var getGvk = apiutil.GVKForObject
3737

3838
// Builder builds a Controller.
3939
type Builder struct {
40-
apiType runtime.Object
41-
mgr manager.Manager
42-
predicates []predicate.Predicate
43-
managedObjects []runtime.Object
44-
watchRequest []watchRequest
45-
config *rest.Config
46-
ctrl controller.Controller
47-
ctrlOptions controller.Options
48-
name string
40+
forInput ForInput
41+
ownsInput []OwnsInput
42+
watchesInput []WatchesInput
43+
mgr manager.Manager
44+
globalPredicates []predicate.Predicate
45+
config *rest.Config
46+
ctrl controller.Controller
47+
ctrlOptions controller.Options
48+
name string
4949
}
5050

5151
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager
@@ -63,32 +63,62 @@ func (blder *Builder) ForType(apiType runtime.Object) *Builder {
6363
return blder.For(apiType)
6464
}
6565

66+
// ForInput represents the information set by For method.
67+
type ForInput struct {
68+
object runtime.Object
69+
predicates []predicate.Predicate
70+
}
71+
6672
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
6773
// update events by *reconciling the object*.
6874
// This is the equivalent of calling
6975
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
70-
func (blder *Builder) For(apiType runtime.Object) *Builder {
71-
blder.apiType = apiType
76+
func (blder *Builder) For(object runtime.Object, opts ...ForOption) *Builder {
77+
input := ForInput{object: object}
78+
for _, opt := range opts {
79+
opt.ApplyToFor(&input)
80+
}
81+
82+
blder.forInput = input
7283
return blder
7384
}
7485

86+
// OwnsInput represents the information set by Owns method.
87+
type OwnsInput struct {
88+
object runtime.Object
89+
predicates []predicate.Predicate
90+
}
91+
7592
// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
7693
// create / delete / update events by *reconciling the owner object*. This is the equivalent of calling
77-
// Watches(&source.Kind{Type: <ForType-apiType>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})
78-
func (blder *Builder) Owns(apiType runtime.Object) *Builder {
79-
blder.managedObjects = append(blder.managedObjects, apiType)
94+
// Watches(&source.Kind{Type: <ForType-forInput>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})
95+
func (blder *Builder) Owns(object runtime.Object, opts ...OwnsOption) *Builder {
96+
input := OwnsInput{object: object}
97+
for _, opt := range opts {
98+
opt.ApplyToOwns(&input)
99+
}
100+
101+
blder.ownsInput = append(blder.ownsInput, input)
80102
return blder
81103
}
82104

83-
type watchRequest struct {
105+
// WatchesInput represents the information set by Watches method.
106+
type WatchesInput struct {
84107
src source.Source
85108
eventhandler handler.EventHandler
109+
predicates []predicate.Predicate
86110
}
87111

88112
// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
89113
// Owns or For instead of Watches directly.
90-
func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler) *Builder {
91-
blder.watchRequest = append(blder.watchRequest, watchRequest{src: src, eventhandler: eventhandler})
114+
// Specified predicates are registered only for given source.
115+
func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
116+
input := WatchesInput{src: src, eventhandler: eventhandler}
117+
for _, opt := range opts {
118+
opt.ApplyToWatches(&input)
119+
}
120+
121+
blder.watchesInput = append(blder.watchesInput, input)
92122
return blder
93123
}
94124

@@ -102,9 +132,10 @@ func (blder *Builder) WithConfig(config *rest.Config) *Builder {
102132

103133
// WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
104134
// trigger reconciliations. For example, filtering on whether the resource version has changed.
135+
// Given predicate is added for all watched objects.
105136
// Defaults to the empty list.
106137
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
107-
blder.predicates = append(blder.predicates, p)
138+
blder.globalPredicates = append(blder.globalPredicates, p)
108139
return blder
109140
}
110141

@@ -157,28 +188,33 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
157188

158189
func (blder *Builder) doWatch() error {
159190
// Reconcile type
160-
src := &source.Kind{Type: blder.apiType}
191+
src := &source.Kind{Type: blder.forInput.object}
161192
hdler := &handler.EnqueueRequestForObject{}
162-
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
193+
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
194+
err := blder.ctrl.Watch(src, hdler, allPredicates...)
163195
if err != nil {
164196
return err
165197
}
166198

167199
// Watches the managed types
168-
for _, obj := range blder.managedObjects {
169-
src := &source.Kind{Type: obj}
200+
for _, own := range blder.ownsInput {
201+
src := &source.Kind{Type: own.object}
170202
hdler := &handler.EnqueueRequestForOwner{
171-
OwnerType: blder.apiType,
203+
OwnerType: blder.forInput.object,
172204
IsController: true,
173205
}
174-
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
206+
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
207+
allPredicates = append(allPredicates, own.predicates...)
208+
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
175209
return err
176210
}
177211
}
178212

179213
// Do the watch requests
180-
for _, w := range blder.watchRequest {
181-
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
214+
for _, w := range blder.watchesInput {
215+
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
216+
allPredicates = append(allPredicates, w.predicates...)
217+
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
182218
return err
183219
}
184220

@@ -196,7 +232,7 @@ func (blder *Builder) getControllerName() (string, error) {
196232
if blder.name != "" {
197233
return blder.name, nil
198234
}
199-
gvk, err := getGvk(blder.apiType, blder.mgr.GetScheme())
235+
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
200236
if err != nil {
201237
return "", err
202238
}

pkg/builder/controller_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ import (
3232
"k8s.io/apimachinery/pkg/types"
3333
"k8s.io/client-go/util/workqueue"
3434
"sigs.k8s.io/controller-runtime/pkg/controller"
35+
"sigs.k8s.io/controller-runtime/pkg/event"
3536
"sigs.k8s.io/controller-runtime/pkg/handler"
3637
"sigs.k8s.io/controller-runtime/pkg/manager"
38+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3739
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3840
"sigs.k8s.io/controller-runtime/pkg/scheme"
3941
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -203,6 +205,67 @@ var _ = Describe("application", func() {
203205
close(done)
204206
}, 10)
205207
})
208+
209+
Describe("Set custom predicates", func() {
210+
It("should execute registered predicates only for assigned kind", func(done Done) {
211+
m, err := manager.New(cfg, manager.Options{})
212+
Expect(err).NotTo(HaveOccurred())
213+
214+
var (
215+
deployPrctExecuted = false
216+
replicaSetPrctExecuted = false
217+
allPrctExecuted = 0
218+
)
219+
220+
deployPrct := predicate.Funcs{
221+
CreateFunc: func(e event.CreateEvent) bool {
222+
defer GinkgoRecover()
223+
// check that it was called only for deployment
224+
Expect(e.Meta).To(BeAssignableToTypeOf(&appsv1.Deployment{}))
225+
deployPrctExecuted = true
226+
return true
227+
},
228+
}
229+
230+
replicaSetPrct := predicate.Funcs{
231+
CreateFunc: func(e event.CreateEvent) bool {
232+
defer GinkgoRecover()
233+
// check that it was called only for replicaset
234+
Expect(e.Meta).To(BeAssignableToTypeOf(&appsv1.ReplicaSet{}))
235+
replicaSetPrctExecuted = true
236+
return true
237+
},
238+
}
239+
240+
allPrct := predicate.Funcs{
241+
CreateFunc: func(e event.CreateEvent) bool {
242+
defer GinkgoRecover()
243+
//check that it was called for all registered kinds
244+
Expect(e.Meta).Should(Or(
245+
BeAssignableToTypeOf(&appsv1.Deployment{}),
246+
BeAssignableToTypeOf(&appsv1.ReplicaSet{}),
247+
))
248+
249+
allPrctExecuted++
250+
return true
251+
},
252+
}
253+
254+
bldr := ControllerManagedBy(m).
255+
For(&appsv1.Deployment{}, WithPredicates(deployPrct)).
256+
Owns(&appsv1.ReplicaSet{}, WithPredicates(replicaSetPrct)).
257+
WithEventFilter(allPrct)
258+
259+
doReconcileTest("5", stop, bldr, m, true)
260+
261+
Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
262+
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
263+
Expect(allPrctExecuted).To(BeNumerically(">=", 2), "Global Predicated should be called at least twice")
264+
265+
close(done)
266+
})
267+
})
268+
206269
})
207270

208271
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {

pkg/builder/options.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package builder
18+
19+
import (
20+
"sigs.k8s.io/controller-runtime/pkg/predicate"
21+
)
22+
23+
// {{{ "Functional" Option Interfaces
24+
25+
// ForOption is some configuration that modifies options for a For request.
26+
type ForOption interface {
27+
// ApplyToFor applies this configuration to the given for input.
28+
ApplyToFor(*ForInput)
29+
}
30+
31+
// OwnsOption is some configuration that modifies options for a owns request.
32+
type OwnsOption interface {
33+
// ApplyToOwns applies this configuration to the given owns input.
34+
ApplyToOwns(*OwnsInput)
35+
}
36+
37+
// WatchesOption is some configuration that modifies options for a watches request.
38+
type WatchesOption interface {
39+
// ApplyToWatches applies this configuration to the given watches options.
40+
ApplyToWatches(*WatchesInput)
41+
}
42+
43+
// }}}
44+
45+
// {{{ Multi-Type Options
46+
47+
// WithPredicates sets the given predicates list.
48+
func WithPredicates(predicates ...predicate.Predicate) Predicates {
49+
return Predicates{
50+
predicates: predicates,
51+
}
52+
}
53+
54+
type Predicates struct {
55+
predicates []predicate.Predicate
56+
}
57+
58+
func (w Predicates) ApplyToFor(opts *ForInput) {
59+
opts.predicates = w.predicates
60+
}
61+
func (w Predicates) ApplyToOwns(opts *OwnsInput) {
62+
opts.predicates = w.predicates
63+
}
64+
func (w Predicates) ApplyToWatches(opts *WatchesInput) {
65+
opts.predicates = w.predicates
66+
}
67+
68+
var _ ForOption = &Predicates{}
69+
var _ OwnsOption = &Predicates{}
70+
var _ WatchesOption = &Predicates{}
71+
72+
// }}}

0 commit comments

Comments
 (0)