Skip to content

Commit 31b222f

Browse files
embikvincepristtts
committed
Add TypedMultiClusterController interface for multi-cluster controllers
On-behalf-of: SAP [email protected] Co-authored-by: Vince Prignano <[email protected]> Co-authored-by: Dr. Stefan Schimanski <[email protected]> Signed-off-by: Marvin Beckers <[email protected]>
1 parent bf4dd6f commit 31b222f

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed

pkg/controller/controller.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,17 @@ type TypedOptions[request comparable] struct {
8282
// LogConstructor is used to construct a logger used for this controller and passed
8383
// to each reconciliation via the context field.
8484
LogConstructor func(request *request) logr.Logger
85+
86+
// EngageWithDefaultCluster indicates whether the controller should engage
87+
// with the default cluster of a manager. This defaults to false through the
88+
// global controller options of the manager if a cluster provider is set,
89+
// and to true otherwise. Here it can be overridden.
90+
EngageWithDefaultCluster *bool
91+
// EngageWithProvidedClusters indicates whether the controller should engage
92+
// with the provided clusters of a manager. This defaults to true through the
93+
// global controller options of the manager if a cluster provider is set,
94+
// and to false otherwise. Here it can be overridden.
95+
EngageWithProviderClusters *bool
8596
}
8697

8798
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests

pkg/controller/multicluster.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package controller
15+
16+
import (
17+
"context"
18+
"sync"
19+
20+
kerrors "k8s.io/apimachinery/pkg/util/errors"
21+
"sigs.k8s.io/controller-runtime/pkg/cluster"
22+
)
23+
24+
// TypedMultiClusterController is a Controller that is aware of the Cluster it is
25+
// running in. It engage and disengage clusters dynamically, starting the
26+
// watches and stopping them.
27+
type TypedMultiClusterController[request comparable] interface {
28+
cluster.Aware
29+
TypedController[request]
30+
}
31+
32+
// TypedMultiClusterOption is a functional option for TypedMultiClusterController.
33+
type TypedMultiClusterOption[request comparable] func(*typedMultiClusterController[request])
34+
35+
// ClusterWatcher starts watches for a given Cluster. The ctx should be
36+
// used to cancel the watch when the Cluster is disengaged.
37+
type ClusterWatcher interface {
38+
Watch(ctx context.Context, cl cluster.Cluster) error
39+
}
40+
41+
// NewTypedMultiClusterController creates a new TypedMultiClusterController for the given
42+
// controller with the given ClusterWatcher.
43+
func NewTypedMultiClusterController[request comparable](c TypedController[request], watcher ClusterWatcher, opts ...TypedMultiClusterOption[request]) TypedMultiClusterController[request] {
44+
mcc := &typedMultiClusterController[request]{
45+
TypedController: c,
46+
watcher: watcher,
47+
clusters: map[string]struct{}{},
48+
}
49+
for _, opt := range opts {
50+
opt(mcc)
51+
}
52+
53+
return mcc
54+
}
55+
56+
// WithClusterAware adds the given cluster.Aware instances to the MultiClusterController,
57+
// being engaged and disengaged when the clusters are added or removed.
58+
func WithClusterAware[request comparable](awares ...cluster.Aware) TypedMultiClusterOption[request] {
59+
return func(c *typedMultiClusterController[request]) {
60+
c.awares = append(c.awares, awares...)
61+
}
62+
}
63+
64+
type typedMultiClusterController[request comparable] struct {
65+
TypedController[request]
66+
watcher ClusterWatcher
67+
68+
lock sync.Mutex
69+
clusters map[string]struct{}
70+
awares []cluster.Aware
71+
}
72+
73+
// Engage gets called when the runnable should start operations for the given Cluster.
74+
func (c *typedMultiClusterController[request]) Engage(clusterCtx context.Context, cl cluster.Cluster) error {
75+
c.lock.Lock()
76+
defer c.lock.Unlock()
77+
78+
if _, ok := c.clusters[cl.Name()]; ok {
79+
return nil
80+
}
81+
82+
engaged := make([]cluster.Aware, 0, len(c.awares)+1)
83+
disengage := func() error {
84+
var errs []error
85+
for _, aware := range engaged {
86+
if err := aware.Disengage(clusterCtx, cl); err != nil {
87+
errs = append(errs, err)
88+
}
89+
}
90+
return kerrors.NewAggregate(errs)
91+
}
92+
93+
// pass through in case the controller itself is cluster aware
94+
if ctrl, ok := c.TypedController.(cluster.Aware); ok {
95+
if err := ctrl.Engage(clusterCtx, cl); err != nil {
96+
return err
97+
}
98+
engaged = append(engaged, ctrl)
99+
}
100+
101+
// engage cluster aware instances
102+
for _, aware := range c.awares {
103+
if err := aware.Engage(clusterCtx, cl); err != nil {
104+
if err := disengage(); err != nil {
105+
return err
106+
}
107+
return err
108+
}
109+
engaged = append(engaged, aware)
110+
}
111+
112+
// start watches on the cluster
113+
if err := c.watcher.Watch(clusterCtx, cl); err != nil {
114+
if err := disengage(); err != nil {
115+
return err
116+
}
117+
return err
118+
}
119+
120+
c.clusters[cl.Name()] = struct{}{}
121+
122+
return nil
123+
}
124+
125+
// Disengage gets called when the runnable should stop operations for the given Cluster.
126+
func (c *typedMultiClusterController[request]) Disengage(ctx context.Context, cl cluster.Cluster) error {
127+
c.lock.Lock()
128+
defer c.lock.Unlock()
129+
130+
if _, ok := c.clusters[cl.Name()]; !ok {
131+
return nil
132+
}
133+
delete(c.clusters, cl.Name())
134+
135+
// pass through in case the controller itself is cluster aware
136+
var errs []error
137+
if ctrl, ok := c.TypedController.(cluster.Aware); ok {
138+
if err := ctrl.Disengage(ctx, cl); err != nil {
139+
errs = append(errs, err)
140+
}
141+
}
142+
143+
return kerrors.NewAggregate(errs)
144+
}

0 commit comments

Comments
 (0)