Skip to content

Commit 0c0cb76

Browse files
embikvincepristtts
committed
Add experimental support for pluggable cluster providers to manager
On-behalf-of: SAP [email protected] Co-authored-by: Vince Prignano <[email protected]> Co-authored-by: Dr. Stefan Schimanski <[email protected]> Signed-off-by: Marvin Beckers <[email protected]>
1 parent 31b222f commit 0c0cb76

File tree

6 files changed

+200
-27
lines changed

6 files changed

+200
-27
lines changed

pkg/cluster/multicluster.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,17 @@ type Aware interface {
4242
Disengage(context.Context, Cluster) error
4343
}
4444

45-
// Provider defines methods to retrieve clusters by name. The provider is
46-
// responsible for discovering and managing the lifecycle of each cluster.
45+
// Provider allows to retrieve clusters by name. The provider is responsible for discovering
46+
// and managing the lifecycle of each cluster, calling `Engage` and `Disengage` on the manager
47+
// it is run against whenever a new cluster is discovered or a cluster is unregistered.
4748
//
4849
// Example: A Cluster API provider would be responsible for discovering and
4950
// managing clusters that are backed by Cluster API resources, which can live
5051
// in multiple namespaces in a single management cluster.
5152
type Provider interface {
5253
// Get returns a cluster for the given identifying cluster name. Get
5354
// returns an existing cluster if it has been created before.
55+
// If no cluster is known to the provider under the given cluster name,
56+
// an error should be returned.
5457
Get(ctx context.Context, clusterName string) (Cluster, error)
5558
}

pkg/config/controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,16 @@ type Controller struct {
5959
//
6060
// Note: This flag is disabled by default until a future version. It's currently in beta.
6161
UsePriorityQueue bool
62+
63+
// EngageWithDefaultCluster indicates whether the controller should engage
64+
// with the default cluster. This default to false if a cluster provider
65+
// is configured, and to true otherwise.
66+
//
67+
// This is an experimental feature and is subject to change.
68+
EngageWithDefaultCluster *bool
69+
70+
// EngageWithProvidedClusters indicates whether the controller should engage
71+
// with the provided clusters of the manager. This defaults to true if a
72+
// cluster provider is set, and to false otherwise.
73+
EngageWithProviderClusters *bool
6274
}

pkg/manager/internal.go

Lines changed: 139 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ const (
5959
)
6060

6161
var _ Runnable = &controllerManager{}
62+
var _ cluster.Aware = &controllerManager{}
63+
var _ Manager = &controllerManager{}
6264

6365
type controllerManager struct {
6466
sync.Mutex
@@ -68,8 +70,14 @@ type controllerManager struct {
6870
errChan chan error
6971
runnables *runnables
7072

71-
// cluster holds a variety of methods to interact with a cluster. Required.
72-
cluster cluster.Cluster
73+
// defaultCluster holds a variety of methods to interact with a defaultCluster. Required.
74+
defaultCluster cluster.Cluster
75+
defaultClusterOptions cluster.Option
76+
77+
// engagedCluster is a map of engaged clusters. The can come and go as the manager is running.
78+
engagedClustersLock sync.RWMutex
79+
engagedClusters map[string]cluster.Cluster
80+
clusterAwareRunnables []cluster.Aware
7381

7482
// recorderProvider is used to generate event recorders that will be injected into Controllers
7583
// (and EventHandlers, Sources and Predicates).
@@ -161,6 +169,9 @@ type controllerManager struct {
161169
// internalProceduresStop channel is used internally to the manager when coordinating
162170
// the proper shutdown of servers. This channel is also used for dependency injection.
163171
internalProceduresStop chan struct{}
172+
173+
// clusterProvider is used to get clusters by name, beyond the default cluster.
174+
clusterProvider cluster.Provider
164175
}
165176

166177
type hasCache interface {
@@ -176,7 +187,40 @@ func (cm *controllerManager) Add(r Runnable) error {
176187
}
177188

178189
func (cm *controllerManager) add(r Runnable) error {
179-
return cm.runnables.Add(r)
190+
var engaged []cluster.Aware
191+
var errs []error
192+
disengage := func() {
193+
for _, aware := range engaged {
194+
if err := aware.Disengage(cm.internalCtx, cm.defaultCluster); err != nil {
195+
errs = append(errs, err)
196+
}
197+
}
198+
}
199+
200+
// engage with existing clusters (this is reversible)
201+
if aware, ok := r.(cluster.Aware); ok {
202+
cm.engagedClustersLock.RLock()
203+
defer cm.engagedClustersLock.RUnlock()
204+
for _, cl := range cm.engagedClusters {
205+
if err := aware.Engage(cm.internalCtx, cl); err != nil {
206+
errs = append(errs, err)
207+
break
208+
}
209+
engaged = append(engaged, aware)
210+
}
211+
if len(errs) > 0 {
212+
disengage()
213+
return kerrors.NewAggregate(errs)
214+
}
215+
cm.clusterAwareRunnables = append(cm.clusterAwareRunnables, aware)
216+
} else {
217+
if err := cm.runnables.Add(r); err != nil {
218+
disengage()
219+
return err
220+
}
221+
}
222+
223+
return nil
180224
}
181225

182226
// AddMetricsServerExtraHandler adds extra handler served on path to the http server that serves metrics.
@@ -231,40 +275,58 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
231275
return nil
232276
}
233277

278+
func (cm *controllerManager) Name() string {
279+
return cm.defaultCluster.Name()
280+
}
281+
282+
func (cm *controllerManager) GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error) {
283+
if clusterName == "" || clusterName == cm.defaultCluster.Name() {
284+
return cm.defaultCluster, nil
285+
}
286+
287+
if cm.clusterProvider == nil {
288+
return nil, fmt.Errorf("cluster %q not found, cluster provider is not set", clusterName)
289+
}
290+
291+
// intentionally not returning from engaged clusters. This can be used
292+
// without engaging clusters.
293+
return cm.clusterProvider.Get(ctx, clusterName)
294+
}
295+
234296
func (cm *controllerManager) GetHTTPClient() *http.Client {
235-
return cm.cluster.GetHTTPClient()
297+
return cm.defaultCluster.GetHTTPClient()
236298
}
237299

238300
func (cm *controllerManager) GetConfig() *rest.Config {
239-
return cm.cluster.GetConfig()
301+
return cm.defaultCluster.GetConfig()
240302
}
241303

242304
func (cm *controllerManager) GetClient() client.Client {
243-
return cm.cluster.GetClient()
305+
return cm.defaultCluster.GetClient()
244306
}
245307

246308
func (cm *controllerManager) GetScheme() *runtime.Scheme {
247-
return cm.cluster.GetScheme()
309+
return cm.defaultCluster.GetScheme()
248310
}
249311

250312
func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
251-
return cm.cluster.GetFieldIndexer()
313+
return cm.defaultCluster.GetFieldIndexer()
252314
}
253315

254316
func (cm *controllerManager) GetCache() cache.Cache {
255-
return cm.cluster.GetCache()
317+
return cm.defaultCluster.GetCache()
256318
}
257319

258320
func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
259-
return cm.cluster.GetEventRecorderFor(name)
321+
return cm.defaultCluster.GetEventRecorderFor(name)
260322
}
261323

262324
func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
263-
return cm.cluster.GetRESTMapper()
325+
return cm.defaultCluster.GetRESTMapper()
264326
}
265327

266328
func (cm *controllerManager) GetAPIReader() client.Reader {
267-
return cm.cluster.GetAPIReader()
329+
return cm.defaultCluster.GetAPIReader()
268330
}
269331

270332
func (cm *controllerManager) GetWebhookServer() webhook.Server {
@@ -381,7 +443,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
381443
}()
382444

383445
// Add the cluster runnable.
384-
if err := cm.add(cm.cluster); err != nil {
446+
if err := cm.add(cm.defaultCluster); err != nil {
385447
return fmt.Errorf("failed to add cluster to runnables: %w", err)
386448
}
387449

@@ -614,6 +676,70 @@ func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector,
614676
return leaderElector, nil
615677
}
616678

679+
func (cm *controllerManager) Engage(ctx context.Context, cl cluster.Cluster) error {
680+
cm.Lock()
681+
defer cm.Unlock()
682+
683+
// be reentrant via noop
684+
cm.engagedClustersLock.RLock()
685+
if _, ok := cm.engagedClusters[cl.Name()]; ok {
686+
cm.engagedClustersLock.RUnlock()
687+
return nil
688+
}
689+
cm.engagedClustersLock.RUnlock()
690+
691+
// add early because any engaged runnable could access it
692+
cm.engagedClustersLock.Lock()
693+
cm.engagedClusters[cl.Name()] = cl
694+
cm.engagedClustersLock.Unlock()
695+
696+
// engage known runnables
697+
var errs []error
698+
engaged := []cluster.Aware{}
699+
for _, r := range cm.clusterAwareRunnables {
700+
if err := r.Engage(ctx, cl); err != nil {
701+
errs = append(errs, err)
702+
break
703+
}
704+
engaged = append(engaged, r)
705+
}
706+
707+
// clean-up
708+
if len(errs) > 0 {
709+
for _, aware := range engaged {
710+
if err := aware.Disengage(ctx, cl); err != nil {
711+
errs = append(errs, err)
712+
}
713+
}
714+
715+
cm.engagedClustersLock.Lock()
716+
delete(cm.engagedClusters, cl.Name())
717+
cm.engagedClustersLock.Unlock()
718+
719+
return kerrors.NewAggregate(errs)
720+
}
721+
722+
return nil
723+
}
724+
725+
func (cm *controllerManager) Disengage(ctx context.Context, cl cluster.Cluster) error {
726+
cm.Lock()
727+
defer cm.Unlock()
728+
729+
var errs []error
730+
for _, r := range cm.clusterAwareRunnables {
731+
if err := r.Disengage(ctx, cl); err != nil {
732+
errs = append(errs, err)
733+
}
734+
}
735+
736+
cm.engagedClustersLock.Lock()
737+
delete(cm.engagedClusters, cl.Name())
738+
cm.engagedClustersLock.Unlock()
739+
740+
return kerrors.NewAggregate(errs)
741+
}
742+
617743
func (cm *controllerManager) startLeaderElectionRunnables() error {
618744
return cm.runnables.LeaderElection.Start(cm.internalCtx)
619745
}

pkg/manager/manager.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ type Manager interface {
5353
// Cluster holds a variety of methods to interact with a cluster.
5454
cluster.Cluster
5555

56+
// Aware is an interface for dynamic cluster addition and removal. The
57+
// Manager will call Engage and Disengage on cluster-aware runnables like
58+
// controllers to e.g. watch multiple clusters.
59+
cluster.Aware
60+
5661
// Add will set requested dependencies on the component, and cause the component to be
5762
// started when Start is called.
5863
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
@@ -87,6 +92,10 @@ type Manager interface {
8792
// lock was lost.
8893
Start(ctx context.Context) error
8994

95+
// GetCluster retrieves a Cluster from a given identifying cluster name. An
96+
// empty string will return the default cluster of the manager.
97+
GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error)
98+
9099
// GetWebhookServer returns a webhook.Server
91100
GetWebhookServer() webhook.Server
92101

@@ -281,6 +290,11 @@ type Options struct {
281290
newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error)
282291
newHealthProbeListener func(addr string) (net.Listener, error)
283292
newPprofListener func(addr string) (net.Listener, error)
293+
294+
// ExperimentalClusterProvider is an EXPERIMENTAL feature that allows the manager to
295+
// operate against many Kubernetes clusters at once. Individual clusters can
296+
// be accessed by calling GetCluster on the Manager.
297+
ExperimentalClusterProvider cluster.Provider
284298
}
285299

286300
// BaseContextFunc is a function used to provide a base Context to Runnables
@@ -325,7 +339,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
325339
// Set default values for options fields
326340
options = setOptionsDefaults(options)
327341

328-
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
342+
clusterOptions := func(clusterOptions *cluster.Options) {
329343
clusterOptions.Scheme = options.Scheme
330344
clusterOptions.MapperProvider = options.MapperProvider
331345
clusterOptions.Logger = options.Logger
@@ -334,7 +348,9 @@ func New(config *rest.Config, options Options) (Manager, error) {
334348
clusterOptions.Cache = options.Cache
335349
clusterOptions.Client = options.Client
336350
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
337-
})
351+
}
352+
353+
cl, err := cluster.New(config, clusterOptions)
338354
if err != nil {
339355
return nil, err
340356
}
@@ -347,7 +363,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
347363
// Create the recorder provider to inject event recorders for the components.
348364
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
349365
// to the particular controller that it's being injected into, rather than a generic one like is here.
350-
recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
366+
recorderProvider, err := options.newRecorderProvider(config, cl.GetHTTPClient(), cl.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
351367
if err != nil {
352368
return nil, err
353369
}
@@ -361,7 +377,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
361377
leaderRecorderProvider = recorderProvider
362378
} else {
363379
leaderConfig = rest.CopyConfig(options.LeaderElectionConfig)
364-
scheme := cluster.GetScheme()
380+
scheme := cl.GetScheme()
365381
err := corev1.AddToScheme(scheme)
366382
if err != nil {
367383
return nil, err
@@ -397,7 +413,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
397413
}
398414

399415
// Create the metrics server.
400-
metricsServer, err := options.newMetricsServer(options.Metrics, config, cluster.GetHTTPClient())
416+
metricsServer, err := options.newMetricsServer(options.Metrics, config, cl.GetHTTPClient())
401417
if err != nil {
402418
return nil, err
403419
}
@@ -417,11 +433,13 @@ func New(config *rest.Config, options Options) (Manager, error) {
417433
}
418434

419435
errChan := make(chan error, 1)
420-
runnables := newRunnables(options.BaseContext, errChan)
421436
return &controllerManager{
422437
stopProcedureEngaged: ptr.To(int64(0)),
423-
cluster: cluster,
424-
runnables: runnables,
438+
defaultCluster: cl,
439+
defaultClusterOptions: clusterOptions,
440+
clusterProvider: options.ExperimentalClusterProvider,
441+
engagedClusters: make(map[string]cluster.Cluster),
442+
runnables: newRunnables(options.BaseContext, errChan),
425443
errChan: errChan,
426444
recorderProvider: recorderProvider,
427445
resourceLock: resourceLock,
@@ -552,5 +570,10 @@ func setOptionsDefaults(options Options) Options {
552570
options.WebhookServer = webhook.NewServer(webhook.Options{})
553571
}
554572

573+
if options.Controller.EngageWithDefaultCluster == nil {
574+
options.Controller.EngageWithDefaultCluster = ptr.To(options.ExperimentalClusterProvider == nil)
575+
options.Controller.EngageWithProviderClusters = ptr.To(options.ExperimentalClusterProvider != nil)
576+
}
577+
555578
return options
556579
}

0 commit comments

Comments
 (0)