Skip to content

Commit 5dc10a2

Browse files
committed
SQUASH: flatten DeepCopyFor types
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
1 parent 3f85096 commit 5dc10a2

File tree

9 files changed

+69
-72
lines changed

9 files changed

+69
-72
lines changed

pkg/builder/controller.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,8 @@ func (blder *Builder) doWatch() error {
288288
return err
289289
}
290290
src := clusterAwareSource{
291-
Source: source.Kind(blder.cluster.GetCache(), obj),
292-
forceDefaultCluster: blder.forInput.forceDefaultCluster,
291+
DeepCopyableSyncingSource: source.Kind(blder.cluster.GetCache(), obj),
292+
forceDefaultCluster: blder.forInput.forceDefaultCluster,
293293
}
294294
hdler := &handler.EnqueueRequestForObject{}
295295
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
@@ -309,8 +309,8 @@ func (blder *Builder) doWatch() error {
309309
return err
310310
}
311311
src := clusterAwareSource{
312-
Source: source.Kind(blder.cluster.GetCache(), obj),
313-
forceDefaultCluster: own.forceDefaultCluster,
312+
DeepCopyableSyncingSource: source.Kind(blder.cluster.GetCache(), obj),
313+
forceDefaultCluster: own.forceDefaultCluster,
314314
}
315315
opts := []handler.OwnerOption{}
316316
if !own.matchEveryOwner {
@@ -334,12 +334,17 @@ func (blder *Builder) doWatch() error {
334334
}
335335
for _, w := range blder.watchesInput {
336336
// If the source of this watch is of type Kind, project it.
337+
src := w.src
337338
if srcKind, ok := w.src.(*internalsource.Kind); ok {
338339
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
339340
if err != nil {
340341
return err
341342
}
342343
srcKind.Type = typeForSrc
344+
src = clusterAwareSource{
345+
DeepCopyableSyncingSource: srcKind,
346+
forceDefaultCluster: w.forceDefaultCluster,
347+
}
343348
} else if !ok {
344349
// If we're building a cluster-aware controller, raw watches are not allowed
345350
// given that the cache cannot be validated to be coming from the same cluster.
@@ -351,10 +356,7 @@ func (blder *Builder) doWatch() error {
351356
}
352357
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
353358
allPredicates = append(allPredicates, w.predicates...)
354-
if err := blder.ctrl.Watch(
355-
clusterAwareSource{Source: w.src, forceDefaultCluster: w.forceDefaultCluster},
356-
w.eventHandler, allPredicates...,
357-
); err != nil {
359+
if err := blder.ctrl.Watch(src, w.eventHandler, allPredicates...); err != nil {
358360
return err
359361
}
360362
}
@@ -445,7 +447,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
445447
}
446448

447449
type clusterAwareSource struct {
448-
source.Source
450+
source.DeepCopyableSyncingSource
449451
forceDefaultCluster bool
450452
}
451453

pkg/cluster/cluster.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,6 @@ type AwareRunnable interface {
5757
Disengage(context.Context, Cluster) error
5858
}
5959

60-
// AwareDeepCopy is an interface that can be implemented by types
61-
// that are cluster-aware, and can return a copy of themselves
62-
// for a given cluster.
63-
type AwareDeepCopy[T any] interface {
64-
DeepCopyFor(Cluster) T
65-
}
66-
6760
// ByNameGetterFunc is a function that returns a cluster for a given identifying cluster name.
6861
type ByNameGetterFunc func(ctx context.Context, clusterName string) (Cluster, error)
6962

pkg/handler/enqueue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,6 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
127127
}
128128

129129
// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler].
130-
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) EventHandler {
130+
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
131131
return &EnqueueRequestForObject{cluster: c}
132132
}

pkg/handler/enqueue_mapped.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqu
9797
}
9898
}
9999

100-
func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) EventHandler {
100+
func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
101101
return &enqueueRequestsFromMapFunc{
102102
cluster: c,
103103
toRequests: e.toRequests,

pkg/handler/enqueue_owner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met
207207
return nil
208208
}
209209

210-
func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) EventHandler {
210+
func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
211211
copy := &enqueueRequestForOwner{
212212
cluster: c,
213213
ownerType: e.ownerType,

pkg/handler/eventhandler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
"k8s.io/client-go/util/workqueue"
23+
"sigs.k8s.io/controller-runtime/pkg/cluster"
2324
"sigs.k8s.io/controller-runtime/pkg/event"
2425
)
2526

@@ -56,6 +57,13 @@ type EventHandler interface {
5657
Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface)
5758
}
5859

60+
// DeepCopyableEventHandler is an EventHandler that can be deep copied for use
61+
// in a different Cluster. This is used if a cluster provider is set in a manager.
62+
type DeepCopyableEventHandler interface {
63+
EventHandler
64+
DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler
65+
}
66+
5967
var _ EventHandler = Funcs{}
6068

6169
// Funcs implements EventHandler.

pkg/internal/controller/controller.go

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"sigs.k8s.io/controller-runtime/pkg/cluster"
3333
"sigs.k8s.io/controller-runtime/pkg/handler"
3434
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
35-
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
3635
logf "sigs.k8s.io/controller-runtime/pkg/log"
3736
"sigs.k8s.io/controller-runtime/pkg/predicate"
3837
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -82,7 +81,7 @@ type Controller struct {
8281
startWatches []*watchDescription
8382

8483
// clusterAwareWatches maintains a list of cluster aware sources, handlers, and predicates to start when the controller is started.
85-
clusterAwareWatches []*watchDescription
84+
clusterAwareWatches []*deepcopyableWatchDescription
8685

8786
// clustersByName is used to manage the fleet of clusters.
8887
clustersByName map[string]*clusterDescription
@@ -124,37 +123,13 @@ type watchDescription struct {
124123
predicates []predicate.Predicate
125124
}
126125

127-
func (w *watchDescription) IsClusterAware() bool {
128-
if _, ok := w.src.(cluster.AwareDeepCopy[*internal.Kind]); !ok {
129-
if _, ok := w.src.(cluster.AwareDeepCopy[source.Source]); !ok {
130-
return false
131-
}
132-
}
133-
if _, ok := w.handler.(cluster.AwareDeepCopy[handler.EventHandler]); !ok {
134-
return false
135-
}
136-
return true
137-
}
138-
139-
func (w *watchDescription) DeepCopyFor(c cluster.Cluster) *watchDescription {
140-
copy := &watchDescription{
141-
predicates: w.predicates,
142-
}
143-
if clusterAwareSource, ok := w.src.(cluster.AwareDeepCopy[*internal.Kind]); ok {
144-
copy.src = clusterAwareSource.DeepCopyFor(c)
145-
} else if clusterAwareSource, ok := w.src.(cluster.AwareDeepCopy[source.Source]); ok {
146-
copy.src = clusterAwareSource.DeepCopyFor(c)
147-
} else {
148-
return nil
149-
}
150-
151-
if clusterAwareHandler, ok := w.handler.(cluster.AwareDeepCopy[handler.EventHandler]); ok {
152-
copy.handler = clusterAwareHandler.DeepCopyFor(c)
153-
} else {
154-
return nil
155-
}
156-
157-
return copy
126+
// deepcopyableWatchDescription contains all the information necessary to start
127+
// a watch. In addition to watchDescription it also contains the DeepCopyFor
128+
// method to adapt it to a different cluster.
129+
type deepcopyableWatchDescription struct {
130+
src source.DeepCopyableSyncingSource
131+
handler handler.DeepCopyableEventHandler
132+
predicates []predicate.Predicate
158133
}
159134

160135
// Reconcile implements reconcile.Reconciler.
@@ -182,14 +157,22 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
182157
c.mu.Lock()
183158
defer c.mu.Unlock()
184159

185-
watchDesc := &watchDescription{src: src, handler: evthdler, predicates: prct}
186-
187160
// If the source is cluster aware, store it in a separate list.
188-
_, forceDefaultClsuter := src.(ClusterAwareSource)
189-
if c.WatchProviderClusters && !forceDefaultClsuter {
190-
if !watchDesc.IsClusterAware() {
191-
return fmt.Errorf("source %s is not cluster aware, but WatchProviderClusters is true", src)
161+
var forceDefaultCluster bool
162+
if src, ok := src.(ClusterAwareSource); ok {
163+
forceDefaultCluster = src.ForceDefaultCluster()
164+
}
165+
if c.WatchProviderClusters && !forceDefaultCluster {
166+
src, ok := src.(source.DeepCopyableSyncingSource)
167+
if !ok {
168+
return fmt.Errorf("source %T is not cluster aware, but WatchProviderClusters is true", src)
192169
}
170+
evthdler, ok := evthdler.(handler.DeepCopyableEventHandler)
171+
if !ok {
172+
return fmt.Errorf("handler %T is not cluster aware, but WatchProviderClusters is true", evthdler)
173+
}
174+
175+
watchDesc := &deepcopyableWatchDescription{src: src, handler: evthdler, predicates: prct}
193176
c.clusterAwareWatches = append(c.clusterAwareWatches, watchDesc)
194177

195178
// If the watch is cluster aware, start it for all the clusters
@@ -208,7 +191,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
208191
//
209192
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
210193
if !c.Started {
211-
c.startWatches = append(c.startWatches, watchDesc)
194+
c.startWatches = append(c.startWatches, &watchDescription{src: src, handler: evthdler, predicates: prct})
212195
return nil
213196
}
214197

@@ -260,8 +243,8 @@ func (c *Controller) Disengage(ctx context.Context, cluster cluster.Cluster) err
260243
return nil
261244
}
262245

263-
func (c *Controller) startClusterAwareWatchLocked(cldesc *clusterDescription, watchDesc *watchDescription) error {
264-
watch := watchDesc.DeepCopyFor(cldesc)
246+
func (c *Controller) startClusterAwareWatchLocked(cldesc *clusterDescription, watchDesc *deepcopyableWatchDescription) error {
247+
watch := &deepcopyableWatchDescription{src: watchDesc.src.DeepCopyFor(cldesc.Cluster), handler: watchDesc.handler.DeepCopyFor(cldesc.Cluster), predicates: watchDesc.predicates}
265248
if watch == nil {
266249
return nil
267250
}

pkg/internal/source/kind.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@ import (
1717
"sigs.k8s.io/controller-runtime/pkg/predicate"
1818
)
1919

20+
type Source interface {
21+
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
22+
}
23+
24+
type SyncingSource interface {
25+
Source
26+
WaitForSync(ctx context.Context) error
27+
}
28+
29+
type DeepCopyableSyncingSource interface {
30+
SyncingSource
31+
DeepCopyFor(cluster cluster.Cluster) DeepCopyableSyncingSource
32+
}
33+
2034
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
2135
type Kind struct {
2236
// Type is the type of object to watch. e.g. &v1.Pod{}
@@ -118,7 +132,7 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
118132
}
119133

120134
// DeepCopyFor implements cluster.AwareDeepCopy[Source].
121-
func (ks *Kind) DeepCopyFor(c cluster.Cluster) *Kind {
135+
func (ks *Kind) DeepCopyFor(c cluster.Cluster) DeepCopyableSyncingSource {
122136
return &Kind{
123137
Type: ks.Type,
124138
Cache: c.GetCache(),

pkg/source/source.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,18 @@ const (
4444
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
4545
//
4646
// Users may build their own Source implementations.
47-
type Source interface {
48-
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
49-
// to enqueue reconcile.Requests.
50-
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
51-
}
47+
type Source = internal.Source
5248

5349
// SyncingSource is a source that needs syncing prior to being usable. The controller
5450
// will call its WaitForSync prior to starting workers.
55-
type SyncingSource interface {
56-
Source
57-
WaitForSync(ctx context.Context) error
58-
}
51+
type SyncingSource = internal.SyncingSource
52+
53+
// DeepCopyableSyncingSource is a source that can be deep copied for a specific cluster.
54+
// It is used in setups with a cluster provider set in the manager.
55+
type DeepCopyableSyncingSource = internal.DeepCopyableSyncingSource
5956

6057
// Kind creates a KindSource with the given cache provider.
61-
func Kind(cache cache.Cache, object client.Object) SyncingSource {
58+
func Kind(cache cache.Cache, object client.Object) DeepCopyableSyncingSource {
6259
return &internal.Kind{Type: object, Cache: cache}
6360
}
6461

0 commit comments

Comments
 (0)