Skip to content

Commit 56f1c12

Browse files
committed
move arguments from Start() to Source objects
Signed-off-by: Tim Ramlot <[email protected]>
1 parent 7a3c9ec commit 56f1c12

22 files changed

+634
-508
lines changed

examples/builtins/main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,17 @@ func main() {
5959
}
6060

6161
// Watch ReplicaSets and enqueue ReplicaSet object key
62-
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
62+
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
6363
entryLog.Error(err, "unable to watch ReplicaSets")
6464
os.Exit(1)
6565
}
6666

6767
// Watch Pods and enqueue owning ReplicaSet key
68-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
69-
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
68+
if err := c.Watch(source.Kind(
69+
mgr.GetCache(),
70+
&corev1.Pod{},
71+
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()),
72+
)); err != nil {
7073
entryLog.Error(err, "unable to watch Pods")
7174
os.Exit(1)
7275
}

pkg/builder/controller.go

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3131
"sigs.k8s.io/controller-runtime/pkg/controller"
3232
"sigs.k8s.io/controller-runtime/pkg/handler"
33-
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
3433
"sigs.k8s.io/controller-runtime/pkg/manager"
3534
"sigs.k8s.io/controller-runtime/pkg/predicate"
3635
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -54,14 +53,15 @@ const (
5453

5554
// Builder builds a Controller.
5655
type Builder struct {
57-
forInput ForInput
58-
ownsInput []OwnsInput
59-
watchesInput []WatchesInput
60-
mgr manager.Manager
61-
globalPredicates []predicate.Predicate
62-
ctrl controller.Controller
63-
ctrlOptions controller.Options
64-
name string
56+
forInput ForInput
57+
ownsInput []OwnsInput
58+
watchesObjectInput []WatchesObjectInput
59+
watchesSourceInput []WatchesSourceInput
60+
mgr manager.Manager
61+
globalPredicates []predicate.Predicate
62+
ctrl controller.Controller
63+
ctrlOptions controller.Options
64+
name string
6565
}
6666

6767
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
@@ -121,9 +121,9 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
121121
return blder
122122
}
123123

124-
// WatchesInput represents the information set by Watches method.
125-
type WatchesInput struct {
126-
src source.Source
124+
// WatchesObjectInput represents the information set by Watches method.
125+
type WatchesObjectInput struct {
126+
object client.Object
127127
eventhandler handler.EventHandler
128128
predicates []predicate.Predicate
129129
objectProjection objectProjection
@@ -134,9 +134,14 @@ type WatchesInput struct {
134134
//
135135
// This is the equivalent of calling
136136
// WatchesRawSource(source.Kind(scheme, object), eventhandler, opts...).
137-
func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
138-
src := source.Kind(blder.mgr.GetCache(), object)
139-
return blder.WatchesRawSource(src, eventhandler, opts...)
137+
func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
138+
input := WatchesObjectInput{object: object, eventhandler: eventhandler}
139+
for _, opt := range opts {
140+
opt.ApplyToObjectWatches(&input)
141+
}
142+
143+
blder.watchesObjectInput = append(blder.watchesObjectInput, input)
144+
return blder
140145
}
141146

142147
// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
@@ -166,23 +171,24 @@ func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHa
166171
// In the first case, controller-runtime will create another cache for the
167172
// concrete type on top of the metadata cache; this increases memory
168173
// consumption and leads to race conditions as caches are not in sync.
169-
func (blder *Builder) WatchesMetadata(object client.Object, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
174+
func (blder *Builder) WatchesMetadata(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
170175
opts = append(opts, OnlyMetadata)
171176
return blder.Watches(object, eventhandler, opts...)
172177
}
173178

179+
// WatchesSourceInput represents the information set by Watches method.
180+
type WatchesSourceInput struct {
181+
src source.Source
182+
}
183+
174184
// WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder.
175185
// Specified predicates are registered only for given source.
176186
//
177187
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
178188
// This method is only exposed for more advanced use cases, most users should use higher level functions.
179-
func (blder *Builder) WatchesRawSource(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
180-
input := WatchesInput{src: src, eventhandler: eventhandler}
181-
for _, opt := range opts {
182-
opt.ApplyToWatches(&input)
183-
}
184-
185-
blder.watchesInput = append(blder.watchesInput, input)
189+
// This method does generally disregard all the global configuration set by the builder.
190+
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
191+
blder.watchesSourceInput = append(blder.watchesSourceInput, WatchesSourceInput{src: src})
186192
return blder
187193
}
188194

@@ -272,10 +278,15 @@ func (blder *Builder) doWatch() error {
272278
if err != nil {
273279
return err
274280
}
275-
src := source.Kind(blder.mgr.GetCache(), obj)
276-
hdler := &handler.EnqueueRequestForObject{}
277-
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
278-
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
281+
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
282+
allPredicates = append(allPredicates, blder.forInput.predicates...)
283+
src := source.Kind(
284+
blder.mgr.GetCache(),
285+
obj,
286+
&handler.EnqueueRequestForObject{},
287+
allPredicates...,
288+
)
289+
if err := blder.ctrl.Watch(src); err != nil {
279290
return err
280291
}
281292
}
@@ -289,7 +300,6 @@ func (blder *Builder) doWatch() error {
289300
if err != nil {
290301
return err
291302
}
292-
src := source.Kind(blder.mgr.GetCache(), obj)
293303
opts := []handler.OwnerOption{}
294304
if !own.matchEveryOwner {
295305
opts = append(opts, handler.OnlyControllerOwner())
@@ -301,32 +311,37 @@ func (blder *Builder) doWatch() error {
301311
)
302312
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
303313
allPredicates = append(allPredicates, own.predicates...)
304-
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
314+
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
315+
if err := blder.ctrl.Watch(src); err != nil {
305316
return err
306317
}
307318
}
308319

309320
// Do the watch requests
310-
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
321+
if len(blder.watchesObjectInput) == 0 && len(blder.watchesSourceInput) == 0 && blder.forInput.object == nil {
311322
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
312323
}
313-
for _, w := range blder.watchesInput {
324+
325+
for _, w := range blder.watchesObjectInput {
326+
obj, err := blder.project(w.object, w.objectProjection)
327+
if err != nil {
328+
return err
329+
}
330+
314331
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
315332
allPredicates = append(allPredicates, w.predicates...)
316-
317-
// If the source of this watch is of type Kind, project it.
318-
if srckind, ok := w.src.(*internalsource.Kind); ok {
319-
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
320-
if err != nil {
321-
return err
322-
}
323-
srckind.Type = typeForSrc
333+
src := source.Kind(blder.mgr.GetCache(), obj, w.eventhandler, allPredicates...)
334+
if err := blder.ctrl.Watch(src); err != nil {
335+
return err
324336
}
337+
}
325338

326-
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
339+
for _, w := range blder.watchesSourceInput {
340+
if err := blder.ctrl.Watch(w.src); err != nil {
327341
return err
328342
}
329343
}
344+
330345
return nil
331346
}
332347

pkg/builder/options.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ type OwnsOption interface {
3434
ApplyToOwns(*OwnsInput)
3535
}
3636

37-
// WatchesOption is some configuration that modifies options for a watches request.
38-
type WatchesOption interface {
37+
// WatchesObjectOption is some configuration that modifies options for a watches request.
38+
type WatchesObjectOption interface {
3939
// ApplyToWatches applies this configuration to the given watches options.
40-
ApplyToWatches(*WatchesInput)
40+
ApplyToObjectWatches(*WatchesObjectInput)
4141
}
4242

4343
// }}}
@@ -66,14 +66,14 @@ func (w Predicates) ApplyToOwns(opts *OwnsInput) {
6666
opts.predicates = w.predicates
6767
}
6868

69-
// ApplyToWatches applies this configuration to the given WatchesInput options.
70-
func (w Predicates) ApplyToWatches(opts *WatchesInput) {
69+
// ApplyToObjectWatches applies this configuration to the given WatchesInput options.
70+
func (w Predicates) ApplyToObjectWatches(opts *WatchesObjectInput) {
7171
opts.predicates = w.predicates
7272
}
7373

7474
var _ ForOption = &Predicates{}
7575
var _ OwnsOption = &Predicates{}
76-
var _ WatchesOption = &Predicates{}
76+
var _ WatchesObjectOption = &Predicates{}
7777

7878
// }}}
7979

@@ -94,8 +94,8 @@ func (p projectAs) ApplyToOwns(opts *OwnsInput) {
9494
opts.objectProjection = objectProjection(p)
9595
}
9696

97-
// ApplyToWatches applies this configuration to the given WatchesInput options.
98-
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
97+
// ApplyToWatches applies this configuration to the given WatchesObjectInput options.
98+
func (p projectAs) ApplyToObjectWatches(opts *WatchesObjectInput) {
9999
opts.objectProjection = objectProjection(p)
100100
}
101101

@@ -132,9 +132,9 @@ var (
132132
// consumption and leads to race conditions as caches are not in sync.
133133
OnlyMetadata = projectAs(projectAsMetadata)
134134

135-
_ ForOption = OnlyMetadata
136-
_ OwnsOption = OnlyMetadata
137-
_ WatchesOption = OnlyMetadata
135+
_ ForOption = OnlyMetadata
136+
_ OwnsOption = OnlyMetadata
137+
_ WatchesObjectOption = OnlyMetadata
138138
)
139139

140140
// }}}

pkg/controller/controller.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28-
"sigs.k8s.io/controller-runtime/pkg/handler"
2928
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
3029
"sigs.k8s.io/controller-runtime/pkg/manager"
31-
"sigs.k8s.io/controller-runtime/pkg/predicate"
3230
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3331
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3432
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -72,13 +70,9 @@ type Controller interface {
7270
// Reconciler is called to reconcile an object by Namespace/Name
7371
reconcile.Reconciler
7472

75-
// Watch takes events provided by a Source and uses the EventHandler to
76-
// enqueue reconcile.Requests in response to the events.
77-
//
78-
// Watch may be provided one or more Predicates to filter events before
79-
// they are given to the EventHandler. Events will be passed to the
80-
// EventHandler if all provided Predicates evaluate to true.
81-
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
73+
// Watch takes events provided by a Source and enqueues reconcile.Requests
74+
// in response to the events.
75+
Watch(src source.Source) error
8276

8377
// Start starts the controller. Start blocks until the context is closed or a
8478
// controller has an error starting.

pkg/controller/controller_integration_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ var _ = Describe("controller", func() {
6464
Expect(err).NotTo(HaveOccurred())
6565

6666
By("Watching Resources")
67-
err = instance.Watch(
68-
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
67+
err = instance.Watch(source.Kind(
68+
cm.GetCache(),
69+
&appsv1.ReplicaSet{},
6970
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
70-
)
71+
))
7172
Expect(err).NotTo(HaveOccurred())
7273

73-
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
74+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
7475
Expect(err).NotTo(HaveOccurred())
7576

7677
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})

pkg/controller/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {
7777

7878
ctx, cancel := context.WithCancel(context.Background())
7979
watchChan := make(chan event.GenericEvent, 1)
80-
watch := &source.Channel{Source: watchChan}
80+
watch := source.Channel(&source.ChannelSource{Source: watchChan}, &handler.EnqueueRequestForObject{})
8181
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8282

8383
reconcileStarted := make(chan struct{})
@@ -99,7 +99,7 @@ var _ = Describe("controller.Controller", func() {
9999
Expect(err).NotTo(HaveOccurred())
100100

101101
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
102-
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
102+
Expect(c.Watch(watch)).To(Succeed())
103103
Expect(err).NotTo(HaveOccurred())
104104

105105
go func() {

pkg/controller/example_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func ExampleController() {
7171
}
7272

7373
// Watch for Pod create / update / delete events and call Reconcile
74-
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
74+
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
7575
if err != nil {
7676
log.Error(err, "unable to watch pods")
7777
os.Exit(1)
@@ -108,7 +108,7 @@ func ExampleController_unstructured() {
108108
Version: "v1",
109109
})
110110
// Watch for Pod create / update / delete events and call Reconcile
111-
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
111+
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
112112
if err != nil {
113113
log.Error(err, "unable to watch pods")
114114
os.Exit(1)
@@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
139139
os.Exit(1)
140140
}
141141

142-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
142+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
143143
log.Error(err, "unable to watch pods")
144144
os.Exit(1)
145145
}

0 commit comments

Comments
 (0)