@@ -26,6 +26,7 @@ import (
26
26
"k8s.io/apimachinery/pkg/runtime/schema"
27
27
"k8s.io/klog/v2"
28
28
29
+ "sigs.k8s.io/controller-runtime/pkg/cache"
29
30
"sigs.k8s.io/controller-runtime/pkg/client"
30
31
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
31
32
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -52,14 +53,58 @@ const (
52
53
projectAsMetadata
53
54
)
54
55
56
+ // WatchObject is an interface on an object wrapper with preserved type information
57
+ type WatchObject interface {
58
+ GetObject () client.Object
59
+ SetSource (cache.Cache ) source.SyncingSource
60
+ // SetPredicates()
61
+ }
62
+
63
+ type watchObject [T client.Object ] struct {
64
+ object T
65
+ source source.SyncingSource
66
+ }
67
+
68
+ // // SetPredicates implements WatchObject.
69
+ // func (w watchObject[T]) SetPredicates() {
70
+ // w.source.Start()
71
+ // }
72
+
73
+ // SetSource returns inner client.Object
74
+ func (w watchObject [T ]) GetObject () client.Object {
75
+ return w .object
76
+ }
77
+
78
+ // SetSource sets and returns the source.SyncingSource on the object
79
+ func (w watchObject [T ]) SetSource (cache cache.Cache ) source.SyncingSource {
80
+ if w .source == nil {
81
+ w .source = source .Object (cache , w .object )
82
+ }
83
+
84
+ return w .source
85
+ }
86
+
87
+ // Object constructs a wrapper on a generic object with stored type information
88
+ func Object [T client.Object ](obj T ) WatchObject {
89
+ return watchObject [T ]{object : obj }
90
+ }
91
+
92
+ type State struct {
93
+ options []Option
94
+ mgr manager.Manager
95
+ }
96
+
55
97
// Builder builds a Controller.
56
98
type Builder struct {
57
99
forInput ForInput
58
100
ownsInput []OwnsInput
59
101
watchesInput []WatchesInput
102
+ blocks []func (State )
103
+ options []Option
60
104
mgr manager.Manager
61
105
globalPredicates []predicate.Predicate
62
106
ctrl controller.Controller
107
+ ctrls []controller.ControllerConstraint
63
108
ctrlOptions controller.Options
64
109
name string
65
110
}
@@ -71,7 +116,7 @@ func ControllerManagedBy(m manager.Manager) *Builder {
71
116
72
117
// ForInput represents the information set by the For method.
73
118
type ForInput struct {
74
- object client. Object
119
+ object WatchObject
75
120
predicates []predicate.Predicate
76
121
objectProjection objectProjection
77
122
err error
@@ -81,12 +126,22 @@ type ForInput struct {
81
126
// update events by *reconciling the object*.
82
127
// This is the equivalent of calling
83
128
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
84
- func (blder * Builder ) For (object client.Object , opts ... ForOption ) * Builder {
129
+ func (blder * Builder ) For (object client.Object , opts ... Option ) * Builder {
130
+ return blder .With (Object (object ), opts ... )
131
+ }
132
+
133
+ // With defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
134
+ // update events by *reconciling the object*.
135
+ // This is the equivalent of calling
136
+ // Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
137
+ //
138
+ // Unlike For, With ensures the type of client.Object returned by predicates or event handlers.
139
+ func (blder * Builder ) With (watch WatchObject , opts ... Option ) * Builder {
85
140
if blder .forInput .object != nil {
86
141
blder .forInput .err = fmt .Errorf ("For(...) should only be called once, could not assign multiple objects for reconciliation" )
87
142
return blder
88
143
}
89
- input := ForInput {object : object }
144
+ input := ForInput {object : watch }
90
145
for _ , opt := range opts {
91
146
opt .ApplyToFor (& input )
92
147
}
@@ -97,10 +152,11 @@ func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
97
152
98
153
// OwnsInput represents the information set by Owns method.
99
154
type OwnsInput struct {
100
- matchEveryOwner bool
101
- object client.Object
102
- predicates []predicate.Predicate
103
- objectProjection objectProjection
155
+ matchEveryOwner bool
156
+ object WatchObject
157
+ predicates []predicate.Predicate
158
+ genericPredicates []predicate.PredicateConstraint
159
+ objectProjection objectProjection
104
160
}
105
161
106
162
// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
@@ -111,7 +167,21 @@ type OwnsInput struct {
111
167
//
112
168
// By default, this is the equivalent of calling
113
169
// Watches(object, handler.EnqueueRequestForOwner([...], ownerType, OnlyControllerOwner())).
114
- func (blder * Builder ) Owns (object client.Object , opts ... OwnsOption ) * Builder {
170
+ func (blder * Builder ) Owns (object client.Object , opts ... Option ) * Builder {
171
+ return blder .Own (Object (object ), opts ... )
172
+ }
173
+
174
+ // Own defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
175
+ // create / delete / update events by *reconciling the owner object*.
176
+ //
177
+ // The default behavior reconciles only the first controller-type OwnerReference of the given type.
178
+ // Use Owns(object, builder.MatchEveryOwner) to reconcile all owners.
179
+ //
180
+ // By default, this is the equivalent of calling
181
+ // Watches(object, handler.EnqueueRequestForOwner([...], ownerType, OnlyControllerOwner())).
182
+ //
183
+ // Unlike Owns, Own ensures the type of client.Object returned by predicates or event handlers.
184
+ func (blder * Builder ) Own (object WatchObject , opts ... Option ) * Builder {
115
185
input := OwnsInput {object : object }
116
186
for _ , opt := range opts {
117
187
opt .ApplyToOwns (& input )
@@ -134,11 +204,72 @@ type WatchesInput struct {
134
204
//
135
205
// This is the equivalent of calling
136
206
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
137
- func (blder * Builder ) Watches (object client.Object , eventHandler handler.EventHandler , opts ... WatchesOption ) * Builder {
138
- src := source .ObjectKind (blder .mgr .GetCache (), object )
207
+ func (blder * Builder ) Watches (object client.Object , eventHandler handler.EventHandler , opts ... Option ) * Builder {
208
+ return blder .Watch (Object (object ), eventHandler , opts ... )
209
+ }
210
+
211
+ // Watch defines the type of Object to watch, and configures the ControllerManagedBy to respond to create / delete /
212
+ // update events by *reconciling the object* with the given EventHandler.
213
+ //
214
+ // This is the equivalent of calling
215
+ // WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
216
+ // Unlike Watches, Watch ensures the type of client.Object returned by predicates or event handlers.
217
+ func (blder * Builder ) Watch (object WatchObject , eventHandler handler.EventHandler , opts ... Option ) * Builder {
218
+ src := object .SetSource (blder .mgr .GetCache ())
139
219
return blder .WatchesRawSource (src , eventHandler , opts ... )
140
220
}
141
221
222
+ func (blder * Builder ) Add (w AddWatch ) * Builder {
223
+ w .AddTo (blder )
224
+ return blder
225
+ }
226
+
227
+ func (blder * Builder ) AddBlock (f func (State )) {
228
+ blder .blocks = append (blder .blocks , f )
229
+ }
230
+
231
+ type AddWatch interface {
232
+ SetSource (* Builder )
233
+ SetEventHandler (* Builder )
234
+ GetOptions () []Option
235
+ AddTo (* Builder )
236
+ }
237
+
238
+ type Adder interface {
239
+ AddTo (* Builder ) * Builder
240
+ }
241
+
242
+ var _ AddWatch = & RawAdder [any ]{}
243
+
244
+ type RawAdder [T any ] struct {
245
+ * Raw [T ]
246
+ }
247
+
248
+ func (r * RawAdder [T ]) AddTo (b * Builder ) {
249
+ b .options = append (b .options , r .GetOptions ()... ) // state
250
+ // Establishing watches, event handlers
251
+ r .SetSource (b )
252
+ r .SetEventHandler (b )
253
+ }
254
+
255
+ func (r * RawAdder [T ]) SetSource (b * Builder ) {}
256
+
257
+ func (r * RawAdder [T ]) SetEventHandler (b * Builder ) {
258
+ b .AddBlock (func (s State ) {
259
+ allPredicates := append ([]predicate.Predicate (nil ), s .options )
260
+ allPredicates = append (allPredicates , own .predicates ... )
261
+ if err := blder .ctrl .Watch (src , hdler , allPredicates ... ); err != nil {
262
+ return err
263
+ }
264
+ })
265
+ }
266
+
267
+ func (blder * RawAdder [T ]) GetOptions () []Option {
268
+ return []Option {}
269
+ }
270
+
271
+ var _ controller.ControllerConstraint
272
+
142
273
// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
143
274
//
144
275
// This is useful when watching lots of objects, really big objects, or objects for which you only know
@@ -166,11 +297,18 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa
166
297
// In the first case, controller-runtime will create another cache for the
167
298
// concrete type on top of the metadata cache; this increases memory
168
299
// consumption and leads to race conditions as caches are not in sync.
169
- func (blder * Builder ) WatchesMetadata (object client.Object , eventHandler handler.EventHandler , opts ... WatchesOption ) * Builder {
300
+ func (blder * Builder ) WatchesMetadata (object client.Object , eventHandler handler.EventHandler , opts ... Option ) * Builder {
170
301
opts = append (opts , OnlyMetadata )
302
+ _ = RawAdder [any ]{}
171
303
return blder .Watches (object , eventHandler , opts ... )
172
304
}
173
305
306
+ type Raw [T any ] struct {
307
+ src source.ObjectSource [T ]
308
+ eventHandler handler.ObjectHandler [T ]
309
+ predicates []predicate.ObjectPredicate [T ]
310
+ }
311
+
174
312
// WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder.
175
313
// Specified predicates are registered only for given source.
176
314
//
@@ -179,7 +317,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
179
317
//
180
318
// Example:
181
319
// WatchesRawSource(source.Kind(cache, &corev1.Pod{}), eventHandler, opts...) // ensure that source propagates only valid Pod objects.
182
- func (blder * Builder ) WatchesRawSource (src source.Source , eventHandler handler.EventHandler , opts ... WatchesOption ) * Builder {
320
+ func (blder * Builder ) WatchesRawSource (src source.Source , eventHandler handler.EventHandler , opts ... Option ) * Builder {
183
321
input := WatchesInput {src : src , eventHandler : eventHandler }
184
322
for _ , opt := range opts {
185
323
opt .ApplyToWatches (& input )
@@ -271,14 +409,22 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
271
409
func (blder * Builder ) doWatch () error {
272
410
// Reconcile type
273
411
if blder .forInput .object != nil {
274
- obj , err := blder .project (blder .forInput .object , blder .forInput .objectProjection )
275
- if err != nil {
276
- return err
412
+ src := blder .forInput .object .SetSource (blder .mgr .GetCache ())
413
+ if _ , ok := blder .forInput .object .(watchObject [client.Object ]); ok {
414
+ obj , err := blder .project (blder .forInput .object .GetObject (), blder .forInput .objectProjection )
415
+ if err != nil {
416
+ return err
417
+ }
418
+ src = source .Object (blder .mgr .GetCache (), obj )
277
419
}
278
- src := source .ObjectKind (blder .mgr .GetCache (), obj )
279
420
hdler := & handler.EnqueueRequestForObject {}
280
421
allPredicates := append ([]predicate.Predicate (nil ), blder .globalPredicates ... )
281
422
allPredicates = append (allPredicates , blder .forInput .predicates ... )
423
+ for _ , c := range blder .ctrls {
424
+ if err := c .DoWatch (); err != nil {
425
+ return err
426
+ }
427
+ }
282
428
if err := blder .ctrl .Watch (src , hdler , allPredicates ... ); err != nil {
283
429
return err
284
430
}
@@ -289,18 +435,21 @@ func (blder *Builder) doWatch() error {
289
435
return errors .New ("Owns() can only be used together with For()" )
290
436
}
291
437
for _ , own := range blder .ownsInput {
292
- obj , err := blder .project (own .object , own .objectProjection )
293
- if err != nil {
294
- return err
438
+ src := own .object .SetSource (blder .mgr .GetCache ())
439
+ if _ , ok := own .object .(watchObject [client.Object ]); ok {
440
+ obj , err := blder .project (own .object .GetObject (), own .objectProjection )
441
+ if err != nil {
442
+ return err
443
+ }
444
+ src = source .Object (blder .mgr .GetCache (), obj )
295
445
}
296
- src := source .ObjectKind (blder .mgr .GetCache (), obj )
297
446
opts := []handler.OwnerOption {}
298
447
if ! own .matchEveryOwner {
299
448
opts = append (opts , handler .OnlyControllerOwner ())
300
449
}
301
450
hdler := handler .EnqueueRequestForOwner (
302
451
blder .mgr .GetScheme (), blder .mgr .GetRESTMapper (),
303
- blder .forInput .object ,
452
+ blder .forInput .object . GetObject () ,
304
453
opts ... ,
305
454
)
306
455
allPredicates := append ([]predicate.Predicate (nil ), blder .globalPredicates ... )
@@ -359,7 +508,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
359
508
hasGVK := blder .forInput .object != nil
360
509
if hasGVK {
361
510
var err error
362
- gvk , err = getGvk (blder .forInput .object , blder .mgr .GetScheme ())
511
+ gvk , err = getGvk (blder .forInput .object . GetObject () , blder .mgr .GetScheme ())
363
512
if err != nil {
364
513
return err
365
514
}
0 commit comments