Skip to content

Commit 8d3b8f7

Browse files
committed
Rework controllers to be multi-cluster aware
Signed-off-by: Vince Prignano <[email protected]>
1 parent ebfcabf commit 8d3b8f7

File tree

19 files changed

+394
-203
lines changed

19 files changed

+394
-203
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module sigs.k8s.io/controller-runtime
33
go 1.19
44

55
require (
6-
github.com/davecgh/go-spew v1.1.1
76
github.com/evanphx/json-patch/v5 v5.6.0
87
github.com/fsnotify/fsnotify v1.6.0
98
github.com/go-logr/logr v1.2.3
@@ -32,6 +31,7 @@ require (
3231
require (
3332
github.com/beorn7/perks v1.0.1 // indirect
3433
github.com/cespare/xxhash/v2 v2.1.2 // indirect
34+
github.com/davecgh/go-spew v1.1.1 // indirect
3535
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
3636
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
3737
github.com/go-openapi/jsonpointer v0.19.5 // indirect

pkg/builder/controller.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,6 @@ type Builder struct {
6868
name string
6969
}
7070

71-
func (blder *Builder) clone() *Builder {
72-
clone := *blder
73-
clone.cluster = nil
74-
clone.logicalName = ""
75-
clone.ctrl = nil
76-
return &clone
77-
}
78-
7971
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
8072
func ControllerManagedBy(m manager.Manager) *Builder {
8173
return &Builder{cluster: m, mgr: m}
@@ -246,19 +238,6 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
246238
if blder.forInput.err != nil {
247239
return nil, blder.forInput.err
248240
}
249-
250-
if err := blder.mgr.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (manager.Runnable, error) {
251-
cloned := blder.clone()
252-
cloned.cluster = cl
253-
cloned.logicalName = name
254-
if err := cloned.do(r); err != nil {
255-
return nil, err
256-
}
257-
return cloned.ctrl, nil
258-
}); err != nil {
259-
return nil, err
260-
}
261-
262241
if err := blder.do(r); err != nil {
263242
return nil, err
264243
}

pkg/builder/controller_test.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"strings"
2323
"sync/atomic"
2424

25-
"github.com/davecgh/go-spew/spew"
2625
"github.com/go-logr/logr"
2726
. "github.com/onsi/ginkgo/v2"
2827
. "github.com/onsi/gomega"
@@ -571,14 +570,36 @@ var _ = Describe("application", func() {
571570
mgr, err := manager.New(cfg, manager.Options{}.WithExperimentalLogicalAdapter(adapter))
572571
Expect(err).NotTo(HaveOccurred())
573572

574-
ch1 := make(chan reconcile.Request)
575-
ch2 := make(chan reconcile.Request)
573+
ctx, cancel := context.WithCancel(context.Background())
574+
defer cancel()
575+
By("Starting the manager")
576+
go func() {
577+
defer GinkgoRecover()
578+
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
579+
}()
580+
581+
cluster1, err := mgr.GetCluster(ctx, "cluster1")
582+
Expect(err).NotTo(HaveOccurred())
583+
584+
By("Creating a custom namespace")
585+
ns := &corev1.Namespace{
586+
ObjectMeta: metav1.ObjectMeta{
587+
GenerateName: "test-multi-cluster-",
588+
},
589+
}
590+
Expect(cluster1.GetClient().Create(ctx, ns)).To(Succeed())
591+
592+
ch1 := make(chan reconcile.Request, 1)
593+
ch2 := make(chan reconcile.Request, 1)
576594
Expect(
577595
ControllerManagedBy(mgr).
578596
For(&appsv1.Deployment{}).
579597
Owns(&appsv1.ReplicaSet{}).
580598
Complete(reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
581-
spew.Dump(req)
599+
if req.Namespace != ns.Name {
600+
return reconcile.Result{}, nil
601+
}
602+
582603
defer GinkgoRecover()
583604
switch req.Cluster {
584605
case "cluster1":
@@ -592,19 +613,11 @@ var _ = Describe("application", func() {
592613
})),
593614
).To(Succeed())
594615

595-
ctx, cancel := context.WithCancel(context.Background())
596-
defer cancel()
597-
By("Starting the manager")
598-
go func() {
599-
defer GinkgoRecover()
600-
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
601-
}()
602-
603616
By("Creating a deployment")
604617
dep := &appsv1.Deployment{
605618
ObjectMeta: metav1.ObjectMeta{
606619
Name: "deploy-multi-cluster",
607-
Namespace: "default",
620+
Namespace: ns.Name,
608621
},
609622
Spec: appsv1.DeploymentSpec{
610623
Selector: &metav1.LabelSelector{
@@ -623,8 +636,6 @@ var _ = Describe("application", func() {
623636
},
624637
},
625638
}
626-
cluster1, err := mgr.GetCluster(ctx, "cluster1")
627-
Expect(err).NotTo(HaveOccurred())
628639
Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed())
629640

630641
By("Waiting for the Deployment Reconcile on both clusters")
@@ -647,7 +658,7 @@ var _ = Describe("application", func() {
647658
// Expect a Reconcile when an Owned object is managedObjects.
648659
rs := &appsv1.ReplicaSet{
649660
ObjectMeta: metav1.ObjectMeta{
650-
Namespace: "default",
661+
Namespace: dep.Namespace,
651662
Name: "rs-multi-cluster",
652663
Labels: dep.Spec.Selector.MatchLabels,
653664
OwnerReferences: []metav1.OwnerReference{

pkg/cluster/cluster.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,43 @@ import (
3838
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
3939
)
4040

41+
// AwareRunnable is an interface that can be implemented by runnable types
42+
// that are cluster-aware.
43+
type AwareRunnable interface {
44+
// Engage gets called when the runnable should start operations for the given Cluster.
45+
// The given context is tied to the Cluster's lifecycle and will be cancelled when the
46+
// Cluster is removed or an error occurs.
47+
//
48+
// Implementers should return an error if they cannot start operations for the given Cluster,
49+
// and should ensure this operation is re-entrant and non-blocking.
50+
//
51+
// \_________________|)____.---'--`---.____
52+
// || \----.________.----/
53+
// || / / `--'
54+
// __||____/ /_
55+
// |___ \
56+
// `--------'
57+
Engage(context.Context, Cluster) error
58+
59+
// Disengage gets called when the runnable should stop operations for the given Cluster.
60+
Disengage(context.Context, Cluster) error
61+
}
62+
63+
// AwareDeepCopy is an interface that can be implemented by types
64+
// that are cluster-aware, and can return a copy of themselves
65+
// for a given cluster.
66+
type AwareDeepCopy[T any] interface {
67+
DeepCopyFor(Cluster) T
68+
}
69+
4170
// LogicalGetterFunc is a function that returns a cluster for a given logical cluster name.
4271
type LogicalGetterFunc func(context.Context, logical.Name) (Cluster, error)
4372

4473
// Cluster provides various methods to interact with a cluster.
4574
type Cluster interface {
75+
// Name returns the unique logical name of the cluster.
76+
Name() logical.Name
77+
4678
// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver
4779
GetHTTPClient() *http.Client
4880

@@ -81,6 +113,9 @@ type Cluster interface {
81113

82114
// Options are the possible options that can be configured for a Cluster.
83115
type Options struct {
116+
// Name is the unique name of the cluster.
117+
Name logical.Name
118+
84119
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
85120
// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
86121
// idea to pass your own scheme in. See the documentation in pkg/scheme for more information.
@@ -279,6 +314,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
279314
}
280315

281316
return &cluster{
317+
name: options.Name,
282318
config: config,
283319
httpClient: options.HTTPClient,
284320
scheme: options.Scheme,
@@ -347,3 +383,13 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
347383

348384
return options, nil
349385
}
386+
387+
// WithName sets the name of the cluster.
388+
func WithName(name logical.Name) Option {
389+
return func(o *Options) {
390+
if o.Name != "" {
391+
panic("cluster name cannot be set more than once")
392+
}
393+
o.Name = name
394+
}
395+
}

pkg/cluster/internal.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@ import (
2929
"sigs.k8s.io/controller-runtime/pkg/cache"
3030
"sigs.k8s.io/controller-runtime/pkg/client"
3131
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
32+
"sigs.k8s.io/logical-cluster"
3233
)
3334

3435
type cluster struct {
36+
name logical.Name
37+
3538
// config is the rest.config used to talk to the apiserver. Required.
3639
config *rest.Config
3740

@@ -59,6 +62,10 @@ type cluster struct {
5962
logger logr.Logger
6063
}
6164

65+
func (c *cluster) Name() logical.Name {
66+
return c.name
67+
}
68+
6269
func (c *cluster) GetConfig() *rest.Config {
6370
return c.config
6471
}

pkg/controller/controller.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
138138

139139
// Create controller with dependencies set
140140
return &controller.Controller{
141-
Cluster: options.LogicalCluster,
142-
Do: options.Reconciler,
141+
Do: options.Reconciler,
143142
MakeQueue: func() workqueue.RateLimitingInterface {
144143
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
145144
},

pkg/handler/enqueue.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"k8s.io/apimachinery/pkg/types"
2323
"k8s.io/client-go/util/workqueue"
24+
"sigs.k8s.io/controller-runtime/pkg/cluster"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
2526
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2627
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -36,16 +37,22 @@ var _ EventHandler = &EnqueueRequestForObject{}
3637
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
3738
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
3839
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
39-
type EnqueueRequestForObject struct{}
40+
type EnqueueRequestForObject struct {
41+
cluster cluster.Cluster
42+
}
4043

4144
// Create implements EventHandler.
4245
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
4346
if evt.Object == nil {
4447
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
4548
return
4649
}
50+
var logicalClusterName logical.Name
51+
if e.cluster != nil {
52+
logicalClusterName = e.cluster.Name()
53+
}
4754
q.Add(reconcile.Request{
48-
Cluster: logical.FromContext(ctx),
55+
Cluster: logicalClusterName,
4956
NamespacedName: types.NamespacedName{
5057
Name: evt.Object.GetName(),
5158
Namespace: evt.Object.GetNamespace(),
@@ -55,20 +62,23 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv
5562

5663
// Update implements EventHandler.
5764
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
58-
logicalName := logical.FromContext(ctx)
65+
var logicalClusterName logical.Name
66+
if e.cluster != nil {
67+
logicalClusterName = e.cluster.Name()
68+
}
5969

6070
switch {
6171
case evt.ObjectNew != nil:
6272
q.Add(reconcile.Request{
63-
Cluster: logicalName,
73+
Cluster: logicalClusterName,
6474
NamespacedName: types.NamespacedName{
6575
Name: evt.ObjectNew.GetName(),
6676
Namespace: evt.ObjectNew.GetNamespace(),
6777
},
6878
})
6979
case evt.ObjectOld != nil:
7080
q.Add(reconcile.Request{
71-
Cluster: logicalName,
81+
Cluster: logicalClusterName,
7282
NamespacedName: types.NamespacedName{
7383
Name: evt.ObjectOld.GetName(),
7484
Namespace: evt.ObjectOld.GetNamespace(),
@@ -85,8 +95,12 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
8595
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
8696
return
8797
}
98+
var logicalClusterName logical.Name
99+
if e.cluster != nil {
100+
logicalClusterName = e.cluster.Name()
101+
}
88102
q.Add(reconcile.Request{
89-
Cluster: logical.FromContext(ctx),
103+
Cluster: logicalClusterName,
90104
NamespacedName: types.NamespacedName{
91105
Name: evt.Object.GetName(),
92106
Namespace: evt.Object.GetNamespace(),
@@ -100,11 +114,20 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
100114
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
101115
return
102116
}
117+
var logicalClusterName logical.Name
118+
if e.cluster != nil {
119+
logicalClusterName = e.cluster.Name()
120+
}
103121
q.Add(reconcile.Request{
104-
Cluster: logical.FromContext(ctx),
122+
Cluster: logicalClusterName,
105123
NamespacedName: types.NamespacedName{
106124
Name: evt.Object.GetName(),
107125
Namespace: evt.Object.GetNamespace(),
108126
},
109127
})
110128
}
129+
130+
// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler].
131+
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) EventHandler {
132+
return &EnqueueRequestForObject{cluster: c}
133+
}

pkg/handler/enqueue_mapped.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import (
2121

2222
"k8s.io/client-go/util/workqueue"
2323
"sigs.k8s.io/controller-runtime/pkg/client"
24+
"sigs.k8s.io/controller-runtime/pkg/cluster"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
2526
"sigs.k8s.io/controller-runtime/pkg/reconcile"
26-
"sigs.k8s.io/logical-cluster"
2727
)
2828

2929
// MapFunc is the signature required for enqueueing requests from a generic function.
@@ -49,6 +49,8 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
4949
var _ EventHandler = &enqueueRequestsFromMapFunc{}
5050

5151
type enqueueRequestsFromMapFunc struct {
52+
cluster cluster.Cluster
53+
5254
// Mapper transforms the argument into a slice of keys to be reconciled
5355
toRequests MapFunc
5456
}
@@ -85,11 +87,18 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqu
8587
continue
8688
}
8789
// If the request doesn't specify a cluster, use the cluster from the context.
88-
if req.Cluster == "" {
89-
req.Cluster = logical.FromContext(ctx)
90+
if req.Cluster == "" && e.cluster != nil {
91+
req.Cluster = e.cluster.Name()
9092
}
9193
// Enqueue the request and track it.
9294
q.Add(req)
9395
reqs[req] = empty{}
9496
}
9597
}
98+
99+
func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) EventHandler {
100+
return &enqueueRequestsFromMapFunc{
101+
cluster: c,
102+
toRequests: e.toRequests,
103+
}
104+
}

0 commit comments

Comments
 (0)