Skip to content

Commit 489224f

Browse files
committed
SQUASH: add ClusterAwareSource
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
1 parent fc48477 commit 489224f

File tree

3 files changed

+55
-8
lines changed

3 files changed

+55
-8
lines changed

pkg/builder/controller.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ var _ controller.ClusterWatcher = &clusterWatcher{}
5858

5959
// clusterWatcher sets up watches between a cluster and a controller.
6060
type clusterWatcher struct {
61-
ctrl controller.Controller
62-
forInput ForInput
63-
ownsInput []OwnsInput
64-
watchesInput []WatchesInput
65-
globalPredicates []predicate.Predicate
61+
ctrl controller.Controller
62+
forInput ForInput
63+
ownsInput []OwnsInput
64+
watchesInput []WatchesInput
65+
globalPredicates []predicate.Predicate
66+
clusterAwareRawSources []source.ClusterAwareSource
6667
}
6768

6869
// Builder builds a Controller.
@@ -197,8 +198,12 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
197198
//
198199
// WatchesRawSource does not respect predicates configured through WithEventFilter.
199200
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
200-
blder.rawSources = append(blder.rawSources, src)
201+
if src, ok := src.(source.ClusterAwareSource); ok {
202+
blder.clusterAwareRawSources = append(blder.clusterAwareRawSources, src)
203+
return blder
204+
}
201205

206+
blder.rawSources = append(blder.rawSources, src)
202207
return blder
203208
}
204209

@@ -344,6 +349,12 @@ func (cc *clusterWatcher) Watch(ctx context.Context, cl cluster.Cluster) error {
344349
}
345350
}
346351

352+
for _, src := range cc.clusterAwareRawSources {
353+
if err := cc.ctrl.Watch(src); err != nil {
354+
return err
355+
}
356+
}
357+
347358
return nil
348359
}
349360

pkg/controller/multicluster.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ type MultiClusterController interface {
3232
Controller
3333
}
3434

35+
// MultiClusterOption is a functional option for MultiClusterController.
36+
type MultiClusterOption func(*multiClusterController)
37+
3538
// ClusterWatcher starts watches for a given Cluster. The ctx should be
3639
// used to cancel the watch when the Cluster is disengaged.
3740
type ClusterWatcher interface {
@@ -40,12 +43,25 @@ type ClusterWatcher interface {
4043

4144
// NewMultiClusterController creates a new MultiClusterController for the given
4245
// controller with the given ClusterWatcher.
43-
func NewMultiClusterController(c Controller, watcher ClusterWatcher) MultiClusterController {
44-
return &multiClusterController{
46+
func NewMultiClusterController(c Controller, watcher ClusterWatcher, opts ...MultiClusterOption) MultiClusterController {
47+
mcc := &multiClusterController{
4548
Controller: c,
4649
watcher: watcher,
4750
clusters: map[string]struct{}{},
4851
}
52+
for _, opt := range opts {
53+
opt(mcc)
54+
}
55+
56+
return mcc
57+
}
58+
59+
// WithClusterAware adds the given cluster.Aware instances to the MultiClusterController,
60+
// being engaged and disengaged when the clusters are added or removed.
61+
func WithClusterAware(awares ...cluster.Aware) MultiClusterOption {
62+
return func(c *multiClusterController) {
63+
c.awares = append(c.awares, awares...)
64+
}
4965
}
5066

5167
type multiClusterController struct {
@@ -54,6 +70,7 @@ type multiClusterController struct {
5470

5571
lock sync.Mutex
5672
clusters map[string]struct{}
73+
awares []cluster.Aware
5774
}
5875

5976
// Engage gets called when the runnable should start operations for the given Cluster.
@@ -84,6 +101,17 @@ func (c *multiClusterController) Engage(clusterCtx context.Context, cl cluster.C
84101
engaged = append(engaged, ctrl)
85102
}
86103

104+
// engage cluster aware instances
105+
for _, aware := range c.awares {
106+
if err := aware.Engage(clusterCtx, cl); err != nil {
107+
if err := disengage(); err != nil {
108+
return err
109+
}
110+
return err
111+
}
112+
engaged = append(engaged, aware)
113+
}
114+
87115
// start watches on the cluster
88116
if err := c.watcher.Watch(clusterCtx, cl); err != nil {
89117
if err := disengage(); err != nil {

pkg/source/source.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/utils/ptr"
2727
"sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/cluster"
2829
"sigs.k8s.io/controller-runtime/pkg/event"
2930
"sigs.k8s.io/controller-runtime/pkg/handler"
3031
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
@@ -54,6 +55,13 @@ type SyncingSource interface {
5455
WaitForSync(ctx context.Context) error
5556
}
5657

58+
// ClusterAwareSource is a source that can be engaged and disengaged when
59+
// clusters are added or removed from the manager.
60+
type ClusterAwareSource interface {
61+
Source
62+
cluster.Aware
63+
}
64+
5765
// Kind creates a KindSource with the given cache provider.
5866
func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource {
5967
return &internal.Kind[T]{

0 commit comments

Comments
 (0)