Skip to content

Commit 3f85096

Browse files
committed
SQUASH: either watch default cluster or providers clusters, not both
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
1 parent 7c02ed9 commit 3f85096

File tree

6 files changed

+105
-31
lines changed

6 files changed

+105
-31
lines changed

pkg/builder/controller.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,11 @@ func ControllerManagedBy(m manager.Manager) *Builder {
7474

7575
// ForInput represents the information set by the For method.
7676
type ForInput struct {
77-
object client.Object
78-
predicates []predicate.Predicate
79-
objectProjection objectProjection
80-
err error
77+
object client.Object
78+
forceDefaultCluster bool // in cluster-aware mode, force the object to be watched in the default cluster
79+
predicates []predicate.Predicate
80+
objectProjection objectProjection
81+
err error
8182
}
8283

8384
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
@@ -100,10 +101,11 @@ func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
100101

101102
// OwnsInput represents the information set by Owns method.
102103
type OwnsInput struct {
103-
matchEveryOwner bool
104-
object client.Object
105-
predicates []predicate.Predicate
106-
objectProjection objectProjection
104+
matchEveryOwner bool
105+
object client.Object
106+
forceDefaultCluster bool // in cluster-aware mode, force the object to be watched in the default cluster
107+
predicates []predicate.Predicate
108+
objectProjection objectProjection
107109
}
108110

109111
// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
@@ -126,10 +128,11 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
126128

127129
// WatchesInput represents the information set by Watches method.
128130
type WatchesInput struct {
129-
src source.Source
130-
eventHandler handler.EventHandler
131-
predicates []predicate.Predicate
132-
objectProjection objectProjection
131+
src source.Source
132+
forceDefaultCluster bool // in cluster-aware mode, force the object to be watched in the default cluster
133+
eventHandler handler.EventHandler
134+
predicates []predicate.Predicate
135+
objectProjection objectProjection
133136
}
134137

135138
// Watches defines the type of Object to watch, and configures the ControllerManagedBy to respond to create / delete /
@@ -284,7 +287,10 @@ func (blder *Builder) doWatch() error {
284287
if err != nil {
285288
return err
286289
}
287-
src := source.Kind(blder.cluster.GetCache(), obj)
290+
src := clusterAwareSource{
291+
Source: source.Kind(blder.cluster.GetCache(), obj),
292+
forceDefaultCluster: blder.forInput.forceDefaultCluster,
293+
}
288294
hdler := &handler.EnqueueRequestForObject{}
289295
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
290296
allPredicates = append(allPredicates, blder.forInput.predicates...)
@@ -302,7 +308,10 @@ func (blder *Builder) doWatch() error {
302308
if err != nil {
303309
return err
304310
}
305-
src := source.Kind(blder.cluster.GetCache(), obj)
311+
src := clusterAwareSource{
312+
Source: source.Kind(blder.cluster.GetCache(), obj),
313+
forceDefaultCluster: own.forceDefaultCluster,
314+
}
306315
opts := []handler.OwnerOption{}
307316
if !own.matchEveryOwner {
308317
opts = append(opts, handler.OnlyControllerOwner())
@@ -342,7 +351,10 @@ func (blder *Builder) doWatch() error {
342351
}
343352
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
344353
allPredicates = append(allPredicates, w.predicates...)
345-
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
354+
if err := blder.ctrl.Watch(
355+
clusterAwareSource{Source: w.src, forceDefaultCluster: w.forceDefaultCluster},
356+
w.eventHandler, allPredicates...,
357+
); err != nil {
346358
return err
347359
}
348360
}
@@ -431,3 +443,12 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
431443
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
432444
return err
433445
}
446+
447+
type clusterAwareSource struct {
448+
source.Source
449+
forceDefaultCluster bool
450+
}
451+
452+
func (s clusterAwareSource) ForceDefaultCluster() bool {
453+
return s.forceDefaultCluster
454+
}

pkg/builder/options.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,21 @@ type matchEveryOwner struct{}
154154
func (o matchEveryOwner) ApplyToOwns(opts *OwnsInput) {
155155
opts.matchEveryOwner = true
156156
}
157+
158+
// InDefaultCluster configures the input to only watch objects on the default
159+
// cluster, even if a cluster provider is set.
160+
var InDefaultCluster = inDefaultCluster{}
161+
162+
type inDefaultCluster struct{}
163+
164+
func (f inDefaultCluster) ApplyToFor(opts *ForInput) {
165+
opts.forceDefaultCluster = true
166+
}
167+
168+
func (f inDefaultCluster) ApplyToOwns(opts *OwnsInput) {
169+
opts.forceDefaultCluster = true
170+
}
171+
172+
func (f inDefaultCluster) ApplyToWatches(opts *WatchesInput) {
173+
opts.forceDefaultCluster = true
174+
}

pkg/config/controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,9 @@ type Controller struct {
4646
// NeedLeaderElection indicates whether the controller needs to use leader election.
4747
// Defaults to true, which means the controller will use leader election.
4848
NeedLeaderElection *bool
49+
50+
// WatchProviderClusters indicates whether the controller should
51+
// only watch clusters that are engaged by the cluster provider. Defaults to false
52+
// if no provider is set, and to true if a provider is set.
53+
WatchProviderClusters *bool
4954
}

pkg/controller/controller.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/go-logr/logr"
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
27+
"k8s.io/utils/pointer"
2728

2829
"sigs.k8s.io/controller-runtime/pkg/handler"
2930
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
@@ -62,6 +63,11 @@ type Options struct {
6263
// LogConstructor is used to construct a logger used for this controller and passed
6364
// to each reconciliation via the context field.
6465
LogConstructor func(request *reconcile.Request) logr.Logger
66+
67+
// WatchProviderClusters indicates whether the controller should
68+
// only watch clusters that are engaged by the cluster provider. Defaults to false
69+
// if no provider is set, and to true if a provider is set.
70+
WatchProviderClusters *bool
6571
}
6672

6773
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -155,6 +161,13 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
155161
options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection
156162
}
157163

164+
if options.WatchProviderClusters == nil {
165+
options.WatchProviderClusters = mgr.GetControllerOptions().WatchProviderClusters
166+
if options.WatchProviderClusters == nil { // should never happen
167+
options.WatchProviderClusters = pointer.Bool(false)
168+
}
169+
}
170+
158171
// Create controller with dependencies set
159172
return &controller.Controller{
160173
Do: options.Reconciler,
@@ -169,6 +182,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
169182
LogConstructor: options.LogConstructor,
170183
RecoverPanic: options.RecoverPanic,
171184
LeaderElected: options.NeedLeaderElection,
185+
WatchProviderClusters: *options.WatchProviderClusters,
172186
}, nil
173187
}
174188

pkg/internal/controller/controller.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ type Controller struct {
8686

8787
// clustersByName is used to manage the fleet of clusters.
8888
clustersByName map[string]*clusterDescription
89+
// WatchProviderClusters indicates whether the controller should
90+
// only watch clusters that are engaged by the cluster provider. Defaults to false
91+
// if no provider is set, and to true if a provider is set.
92+
WatchProviderClusters bool
8993

9094
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
9195
// or for example when a watch is started.
@@ -100,6 +104,13 @@ type Controller struct {
100104
LeaderElected *bool
101105
}
102106

107+
// ClusterAwareSource is a source that knows whether to watch in the default cluster
108+
// in the clusters engaged by the cluster provider.
109+
type ClusterAwareSource interface {
110+
source.Source
111+
ForceDefaultCluster() bool
112+
}
113+
103114
type clusterDescription struct {
104115
cluster.Cluster
105116
ctx context.Context
@@ -174,8 +185,23 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
174185
watchDesc := &watchDescription{src: src, handler: evthdler, predicates: prct}
175186

176187
// If the source is cluster aware, store it in a separate list.
177-
if watchDesc.IsClusterAware() {
188+
_, forceDefaultClsuter := src.(ClusterAwareSource)
189+
if c.WatchProviderClusters && !forceDefaultClsuter {
190+
if !watchDesc.IsClusterAware() {
191+
return fmt.Errorf("source %s is not cluster aware, but WatchProviderClusters is true", src)
192+
}
178193
c.clusterAwareWatches = append(c.clusterAwareWatches, watchDesc)
194+
195+
// If the watch is cluster aware, start it for all the clusters
196+
// This covers the case where a Watch was added later to the controller.
197+
var errs []error
198+
for _, cldesc := range c.clustersByName {
199+
if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil {
200+
errs = append(errs, err)
201+
}
202+
}
203+
204+
return kerrors.NewAggregate(errs)
179205
}
180206

181207
// Controller hasn't started yet, store the watches locally and return.
@@ -186,22 +212,8 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
186212
return nil
187213
}
188214

189-
var errs []error
190215
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
191-
if err := src.Start(c.ctx, evthdler, c.Queue, prct...); err != nil {
192-
errs = append(errs, err)
193-
}
194-
195-
// If the watch is cluster aware, start it for all the clusters
196-
// This covers the case where a Watch was added later to the controller.
197-
if watchDesc.IsClusterAware() {
198-
for _, cldesc := range c.clustersByName {
199-
if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil {
200-
errs = append(errs, err)
201-
}
202-
}
203-
}
204-
return kerrors.NewAggregate(errs)
216+
return src.Start(c.ctx, evthdler, c.Queue, prct...)
205217
}
206218

207219
// Engage implements cluster.AwareRunnable.

pkg/manager/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,5 +686,9 @@ func setOptionsDefaults(options Options) Options {
686686
options.WebhookServer = webhook.NewServer(webhook.Options{})
687687
}
688688

689+
if options.Controller.WatchProviderClusters == nil {
690+
options.Controller.WatchProviderClusters = pointer.Bool(options.clusterProvider != nil)
691+
}
692+
689693
return options
690694
}

0 commit comments

Comments
 (0)