Skip to content

Commit 8c4d3df

Browse files
Separate source into several interfaces
- Add controller adapters Signed-off-by: Danil Grigorev <[email protected]>
1 parent 3d2371d commit 8c4d3df

File tree

16 files changed

+267
-207
lines changed

16 files changed

+267
-207
lines changed

examples/builtins/main.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,13 @@ func main() {
5959
}
6060

6161
// Watch ReplicaSets and enqueue ReplicaSet object key
62-
src := source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{})
63-
src.Prepare(&handler.EnqueueRequestForObject{})
64-
if err := c.Watch(src); err != nil {
62+
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}).Prepare(&handler.EnqueueRequestForObject{})); err != nil {
6563
entryLog.Error(err, "unable to watch ReplicaSets")
6664
os.Exit(1)
6765
}
6866

6967
// Watch Pods and enqueue owning ReplicaSet key
70-
src = source.Kind(mgr.GetCache(), &corev1.Pod{})
71-
src.Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))
72-
if err := c.Watch(src); err != nil {
68+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
7369
entryLog.Error(err, "unable to watch Pods")
7470
os.Exit(1)
7571
}

pkg/builder/controller.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
124124

125125
// WatchesInput represents the information set by Watches method.
126126
type WatchesInput struct {
127-
src source.SourcePrepare
127+
src source.PrepareSyncing
128128
eventHandler handler.EventHandler
129129
predicates []predicate.Predicate
130130
objectProjection objectProjection
@@ -177,7 +177,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
177177
//
178178
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
179179
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
180-
func (blder *Builder) WatchesRawSource(src source.SourcePrepare, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
180+
func (blder *Builder) WatchesRawSource(src source.PrepareSyncing, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
181181
input := WatchesInput{src: src, eventHandler: eventHandler}
182182
for _, opt := range opts {
183183
opt.ApplyToWatches(&input)
@@ -188,29 +188,24 @@ func (blder *Builder) WatchesRawSource(src source.SourcePrepare, eventHandler ha
188188
}
189189

190190
func For[T client.Object](blder *Builder, object T, prct ...predicate.ObjectPredicate[T]) source.Source {
191-
src := source.ObjectKind(blder.mgr.GetCache(), object)
192-
src.PrepareObject(&handler.EnqueueRequest[T]{}, prct...)
191+
blder.forInput = ForInput{object: object}
193192

194-
return src
193+
return source.ObjectKind(blder.mgr.GetCache(), object).PrepareObject(&handler.EnqueueRequest[T]{}, prct...)
195194
}
196195

197-
func Owns[F, T client.Object](blder *Builder, owner F, owned T) source.Source {
196+
func Owns[F, T client.Object](blder *Builder, owner F, owned T, prct ...predicate.ObjectPredicate[T]) source.Source {
198197
src := source.ObjectKind(blder.mgr.GetCache(), owned)
199198

200199
hdler := handler.EnqueueRequestForOwner(
201200
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
202201
owner,
203202
)
204-
src.PrepareObject(handler.ObjectFuncAdapter[T](hdler))
205203

206-
return src
204+
return src.PrepareObject(handler.ObjectFuncAdapter[T](hdler), prct...)
207205
}
208206

209207
func Watches[T client.Object](blder *Builder, object T, eventHandler handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) source.Source {
210-
src := source.ObjectKind(blder.mgr.GetCache(), object)
211-
src.PrepareObject(eventHandler, prct...)
212-
213-
return src
208+
return source.ObjectKind(blder.mgr.GetCache(), object).PrepareObject(eventHandler, prct...)
214209
}
215210

216211
func (blder *Builder) Add(src source.Source) *Builder {
@@ -308,8 +303,7 @@ func (blder *Builder) doWatch() error {
308303
hdler := &handler.EnqueueRequestForObject{}
309304
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
310305
allPredicates = append(allPredicates, blder.forInput.predicates...)
311-
src.Prepare(hdler, allPredicates...)
312-
if err := blder.ctrl.Watch(src); err != nil {
306+
if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil {
313307
return err
314308
}
315309
}
@@ -335,8 +329,7 @@ func (blder *Builder) doWatch() error {
335329
)
336330
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
337331
allPredicates = append(allPredicates, own.predicates...)
338-
src.Prepare(hdler, allPredicates...)
339-
if err := blder.ctrl.Watch(src); err != nil {
332+
if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil {
340333
return err
341334
}
342335
}
@@ -356,8 +349,7 @@ func (blder *Builder) doWatch() error {
356349
}
357350
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
358351
allPredicates = append(allPredicates, w.predicates...)
359-
w.src.Prepare(w.eventHandler, allPredicates...)
360-
if err := blder.ctrl.Watch(w.src); err != nil {
352+
if err := blder.ctrl.Watch(w.src.Prepare(w.eventHandler, allPredicates...)); err != nil {
361353
return err
362354
}
363355
}

pkg/controller/controller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28+
"sigs.k8s.io/controller-runtime/pkg/handler"
2829
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
2930
"sigs.k8s.io/controller-runtime/pkg/manager"
31+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3032
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3133
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3234
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -189,3 +191,18 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
189191

190192
// ReconcileIDFromContext gets the reconcileID from the current context.
191193
var ReconcileIDFromContext = controller.ReconcileIDFromContext
194+
195+
// ControllerAdapter is an adapter for old controller implementations
196+
type ControllerAdapter struct {
197+
Controller
198+
}
199+
200+
// Watch implements old controller Watch interface
201+
func (c *ControllerAdapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error {
202+
source, ok := src.(source.PrepareSource)
203+
if !ok {
204+
return fmt.Errorf("expected source to fulfill SourcePrepare interface")
205+
}
206+
207+
return c.Controller.Watch(source.Prepare(handler, predicates...))
208+
}

pkg/controller/controller_integration_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,10 @@ var _ = Describe("controller", func() {
6464
Expect(err).NotTo(HaveOccurred())
6565

6666
By("Watching Resources")
67-
src := source.Kind(cm.GetCache(), &appsv1.ReplicaSet{})
68-
src.Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}))
69-
err = instance.Watch(src)
67+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}).Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{})))
7068
Expect(err).NotTo(HaveOccurred())
7169

72-
src = source.Kind(cm.GetCache(), &appsv1.Deployment{})
73-
src.Prepare(&handler.EnqueueRequestForObject{})
74-
err = instance.Watch(src)
70+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}).Prepare(&handler.EnqueueRequestForObject{}))
7571
Expect(err).NotTo(HaveOccurred())
7672

7773
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})

pkg/controller/controller_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ var _ = Describe("controller.Controller", func() {
101101
Expect(err).NotTo(HaveOccurred())
102102

103103
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
104-
watch.Prepare(&handler.EnqueueRequestForObject{})
105-
Expect(c.Watch(watch)).To(Succeed())
104+
Expect(c.Watch(watch.Prepare(&handler.EnqueueRequestForObject{}))).To(Succeed())
106105
Expect(err).NotTo(HaveOccurred())
107106

108107
go func() {

pkg/controller/example_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,7 @@ func ExampleController() {
7171
}
7272

7373
// Watch for Pod create / update / delete events and call Reconcile
74-
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
75-
src.Prepare(&handler.EnqueueRequestForObject{})
76-
err = c.Watch(src)
74+
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(&handler.EnqueueRequestForObject{}))
7775
if err != nil {
7876
log.Error(err, "unable to watch pods")
7977
os.Exit(1)
@@ -111,8 +109,7 @@ func ExampleController_unstructured() {
111109
})
112110
// Watch for Pod create / update / delete events and call Reconcile
113111
src := source.Kind(mgr.GetCache(), u)
114-
src.Prepare(&handler.EnqueueRequestForObject{})
115-
err = c.Watch(src)
112+
err = c.Watch(src.Prepare(&handler.EnqueueRequestForObject{}))
116113
if err != nil {
117114
log.Error(err, "unable to watch pods")
118115
os.Exit(1)
@@ -144,8 +141,7 @@ func ExampleNewUnmanaged() {
144141
}
145142

146143
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
147-
src.Prepare(&handler.EnqueueRequestForObject{})
148-
if err := c.Watch(src); err != nil {
144+
if err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{})); err != nil {
149145
log.Error(err, "unable to watch pods")
150146
os.Exit(1)
151147
}

pkg/handler/enqueue_mapped_typed.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ func EnqueueRequestsFromObjectMapFunc[T any](fn ObjectMapFunc[T]) EventHandler {
5757
}
5858
}
5959

60+
// EnqueueRequestsFromObjectMap enqueues Requests by running a transformation function that outputs a collection
61+
// of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects
62+
// defined by some user specified transformation of the source Event. (e.g. trigger Reconciler for a set of objects
63+
// in response to a cluster resize event caused by adding or deleting a Node)
64+
//
65+
// EnqueueRequestsFromObjectMap is frequently used to fan-out updates from one object to one or more other
66+
// objects of a differing type.
67+
//
68+
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
69+
// objects and both sets of Requests are enqueue.
70+
func EnqueueRequestsFromObjectMap[T any](fn ObjectMapFunc[T]) ObjectHandler[T] {
71+
return &enqueueRequestsFromObjectMapFunc[T]{
72+
toRequests: fn,
73+
}
74+
}
75+
6076
var _ EventHandler = &enqueueRequestsFromObjectMapFunc[any]{}
6177
var _ ObjectHandler[any] = &enqueueRequestsFromObjectMapFunc[any]{}
6278

pkg/handler/example_test.go

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ var (
4242
func ExampleEnqueueRequestForObject() {
4343
// controller is a controller.controller
4444
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
45-
src.Prepare(&handler.EnqueueRequestForObject{})
46-
err := c.Watch(src)
45+
err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{}))
4746
if err != nil {
4847
// handle it
4948
}
@@ -65,19 +64,19 @@ func ExampleEnqueueRequestForOwner() {
6564
// objects (of Type: MyKind) using a mapping function defined by the user.
6665
func ExampleEnqueueRequestsFromMapFunc() {
6766
// controller is a controller.controller
68-
src := source.Kind(mgr.GetCache(), &appsv1.Deployment{})
69-
src.Prepare(handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
70-
return []reconcile.Request{
71-
{NamespacedName: types.NamespacedName{
72-
Name: a.GetName() + "-1",
73-
Namespace: a.GetNamespace(),
74-
}},
75-
{NamespacedName: types.NamespacedName{
76-
Name: a.GetName() + "-2",
77-
Namespace: a.GetNamespace(),
78-
}},
79-
}
80-
}))
67+
src := source.Kind(mgr.GetCache(), &appsv1.Deployment{}).
68+
Prepare(handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
69+
return []reconcile.Request{
70+
{NamespacedName: types.NamespacedName{
71+
Name: a.GetName() + "-1",
72+
Namespace: a.GetNamespace(),
73+
}},
74+
{NamespacedName: types.NamespacedName{
75+
Name: a.GetName() + "-2",
76+
Namespace: a.GetNamespace(),
77+
}},
78+
}
79+
}))
8180
err := c.Watch(src)
8281
if err != nil {
8382
// handle it
@@ -87,33 +86,33 @@ func ExampleEnqueueRequestsFromMapFunc() {
8786
// This example implements handler.EnqueueRequestForObject.
8887
func ExampleFuncs() {
8988
// controller is a controller.controller
90-
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
91-
src.Prepare(handler.Funcs{
92-
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
93-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
94-
Name: e.Object.GetName(),
95-
Namespace: e.Object.GetNamespace(),
96-
}})
97-
},
98-
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
99-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
100-
Name: e.ObjectNew.GetName(),
101-
Namespace: e.ObjectNew.GetNamespace(),
102-
}})
103-
},
104-
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
105-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
106-
Name: e.Object.GetName(),
107-
Namespace: e.Object.GetNamespace(),
108-
}})
109-
},
110-
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
111-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
112-
Name: e.Object.GetName(),
113-
Namespace: e.Object.GetNamespace(),
114-
}})
115-
},
116-
})
89+
src := source.Kind(mgr.GetCache(), &corev1.Pod{}).
90+
Prepare(handler.Funcs{
91+
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
92+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
93+
Name: e.Object.GetName(),
94+
Namespace: e.Object.GetNamespace(),
95+
}})
96+
},
97+
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
98+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
99+
Name: e.ObjectNew.GetName(),
100+
Namespace: e.ObjectNew.GetNamespace(),
101+
}})
102+
},
103+
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
104+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
105+
Name: e.Object.GetName(),
106+
Namespace: e.Object.GetNamespace(),
107+
}})
108+
},
109+
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
110+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
111+
Name: e.Object.GetName(),
112+
Namespace: e.Object.GetNamespace(),
113+
}})
114+
},
115+
})
117116

118117
err := c.Watch(src)
119118
if err != nil {

pkg/interfaces/source.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package interfaces
17+
18+
import (
19+
"context"
20+
21+
"k8s.io/client-go/util/workqueue"
22+
"sigs.k8s.io/controller-runtime/pkg/handler"
23+
"sigs.k8s.io/controller-runtime/pkg/predicate"
24+
)
25+
26+
// Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
27+
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
28+
//
29+
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
30+
//
31+
// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls).
32+
//
33+
// Users may build their own Source implementations.
34+
type Source interface {
35+
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
36+
// to enqueue reconcile.Requests.
37+
Start(context.Context, workqueue.RateLimitingInterface) error
38+
}
39+
40+
// PrepareSource: Prepares a Source to be used with EventHandler and predicates
41+
type PrepareSource interface {
42+
Prepare(handler.EventHandler, ...predicate.Predicate) SyncingSource
43+
}
44+
45+
// PrepareSourceObject[T]: Prepares a Source preserving the object type
46+
type PrepareSourceObject[T any] interface {
47+
PrepareObject(handler.ObjectHandler[T], ...predicate.ObjectPredicate[T]) SyncingSource
48+
}
49+
50+
// Syncing allows to wait for synchronization with context
51+
type Syncing interface {
52+
WaitForSync(ctx context.Context) error
53+
}
54+
55+
// SyncingSource is a source that needs syncing prior to being usable. The controller
56+
// will call its WaitForSync prior to starting workers.
57+
type SyncingSource interface {
58+
Source
59+
Syncing
60+
}
61+
62+
// PrepareSyncing: A SyncingSource that also implements SourcePrepare and has WaitForSync method
63+
type PrepareSyncing interface {
64+
SyncingSource
65+
PrepareSource
66+
}
67+
68+
// PrepareSyncingObject[T]: A SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method
69+
type PrepareSyncingObject[T any] interface {
70+
SyncingSource
71+
PrepareSourceObject[T]
72+
}

0 commit comments

Comments
 (0)