Skip to content

Commit 2530566

Browse files
committed
wip: logical cluster
Signed-off-by: Vince Prignano <[email protected]>
1 parent f6f37e6 commit 2530566

File tree

16 files changed

+670
-96
lines changed

16 files changed

+670
-96
lines changed

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module sigs.k8s.io/controller-runtime
33
go 1.19
44

55
require (
6+
github.com/davecgh/go-spew v1.1.1
67
github.com/evanphx/json-patch/v5 v5.6.0
78
github.com/fsnotify/fsnotify v1.6.0
89
github.com/go-logr/logr v1.2.3
@@ -24,13 +25,13 @@ require (
2425
k8s.io/component-base v0.26.1
2526
k8s.io/klog/v2 v2.90.0
2627
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448
28+
sigs.k8s.io/logical-cluster v0.0.0-00010101000000-000000000000
2729
sigs.k8s.io/yaml v1.3.0
2830
)
2931

3032
require (
3133
github.com/beorn7/perks v1.0.1 // indirect
3234
github.com/cespare/xxhash/v2 v2.1.2 // indirect
33-
github.com/davecgh/go-spew v1.1.1 // indirect
3435
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
3536
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
3637
github.com/go-openapi/jsonpointer v0.19.5 // indirect
@@ -72,3 +73,5 @@ require (
7273
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
7374
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
7475
)
76+
77+
replace sigs.k8s.io/logical-cluster => github.com/vincepri/logical-cluster v0.0.0-20230221175249-b5d99f705d5b

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
268268
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
269269
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
270270
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
271+
github.com/vincepri/logical-cluster v0.0.0-20230221175249-b5d99f705d5b h1:eGYbpQr7r86O5wqbYMgLIp+wUZ5DSJpICK6UKYXTwPk=
272+
github.com/vincepri/logical-cluster v0.0.0-20230221175249-b5d99f705d5b/go.mod h1:5keWHzDm2ppkz5PTL3upRzPeVeci8JLApFv1u+Q6uYE=
271273
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
272274
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
273275
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

hack/test-all.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ if [[ -n ${ARTIFACTS:-} ]]; then
2525
fi
2626

2727
result=0
28-
go test -v -race ${P_FLAG} ${MOD_OPT} ./... --ginkgo.fail-fast ${GINKGO_ARGS} || result=$?
28+
go test -v -race ${P_FLAG} ${MOD_OPT} ./pkg/builder/... --ginkgo.fail-fast --ginkgo.v ${GINKGO_ARGS} || result=$?
2929

3030
if [[ -n ${ARTIFACTS:-} ]]; then
3131
mkdir -p ${ARTIFACTS}

pkg/builder/controller.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"sigs.k8s.io/controller-runtime/pkg/client"
3030
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
31+
"sigs.k8s.io/controller-runtime/pkg/cluster"
3132
"sigs.k8s.io/controller-runtime/pkg/controller"
3233
"sigs.k8s.io/controller-runtime/pkg/handler"
3334
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
@@ -58,15 +59,23 @@ type Builder struct {
5859
ownsInput []OwnsInput
5960
watchesInput []WatchesInput
6061
mgr manager.Manager
62+
cluster cluster.Cluster
6163
globalPredicates []predicate.Predicate
6264
ctrl controller.Controller
6365
ctrlOptions controller.Options
6466
name string
6567
}
6668

69+
func (blder *Builder) clone() *Builder {
70+
clone := *blder
71+
clone.cluster = nil
72+
clone.ctrl = nil
73+
return &clone
74+
}
75+
6776
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
6877
func ControllerManagedBy(m manager.Manager) *Builder {
69-
return &Builder{mgr: m}
78+
return &Builder{cluster: m, mgr: m}
7079
}
7180

7281
// ForInput represents the information set by For method.
@@ -135,7 +144,7 @@ type WatchesInput struct {
135144
// This is the equivalent of calling
136145
// WatchesRawSource(source.Kind(scheme, object), eventhandler, opts...).
137146
func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
138-
src := source.Kind(blder.mgr.GetCache(), object)
147+
src := source.Kind(blder.cluster.GetCache(), object)
139148
return blder.WatchesRawSource(src, eventhandler, opts...)
140149
}
141150

@@ -228,24 +237,45 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
228237
if r == nil {
229238
return nil, fmt.Errorf("must provide a non-nil Reconciler")
230239
}
231-
if blder.mgr == nil {
240+
if blder.mgr == nil || blder.cluster == nil {
232241
return nil, fmt.Errorf("must provide a non-nil Manager")
233242
}
234243
if blder.forInput.err != nil {
235244
return nil, blder.forInput.err
236245
}
237246

247+
if err := blder.mgr.AddRunnableBuilder(func(cl cluster.Cluster) (manager.Runnable, error) {
248+
cloned := blder.clone()
249+
cloned.cluster = cl
250+
if err := cloned.do(r); err != nil {
251+
return nil, err
252+
}
253+
return cloned.ctrl, nil
254+
}); err != nil {
255+
return nil, err
256+
}
257+
258+
if err := blder.do(r); err != nil {
259+
return nil, err
260+
}
261+
if err := blder.mgr.Add(blder.ctrl); err != nil {
262+
return nil, err
263+
}
264+
return blder.ctrl, nil
265+
}
266+
267+
func (blder *Builder) do(r reconcile.Reconciler) error {
238268
// Set the ControllerManagedBy
239269
if err := blder.doController(r); err != nil {
240-
return nil, err
270+
return err
241271
}
242272

243273
// Set the Watch
244274
if err := blder.doWatch(); err != nil {
245-
return nil, err
275+
return err
246276
}
247277

248-
return blder.ctrl, nil
278+
return nil
249279
}
250280

251281
func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) {
@@ -254,7 +284,7 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
254284
return obj, nil
255285
case projectAsMetadata:
256286
metaObj := &metav1.PartialObjectMetadata{}
257-
gvk, err := getGvk(obj, blder.mgr.GetScheme())
287+
gvk, err := getGvk(obj, blder.cluster.GetScheme())
258288
if err != nil {
259289
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
260290
}
@@ -272,7 +302,7 @@ func (blder *Builder) doWatch() error {
272302
if err != nil {
273303
return err
274304
}
275-
src := source.Kind(blder.mgr.GetCache(), obj)
305+
src := source.Kind(blder.cluster.GetCache(), obj)
276306
hdler := &handler.EnqueueRequestForObject{}
277307
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
278308
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
@@ -289,13 +319,13 @@ func (blder *Builder) doWatch() error {
289319
if err != nil {
290320
return err
291321
}
292-
src := source.Kind(blder.mgr.GetCache(), obj)
322+
src := source.Kind(blder.cluster.GetCache(), obj)
293323
opts := []handler.OwnerOption{}
294324
if !own.matchEveryOwner {
295325
opts = append(opts, handler.OnlyControllerOwner())
296326
}
297327
hdler := handler.EnqueueRequestForOwner(
298-
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
328+
blder.cluster.GetScheme(), blder.cluster.GetRESTMapper(),
299329
blder.forInput.object,
300330
opts...,
301331
)
@@ -354,7 +384,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
354384
hasGVK := blder.forInput.object != nil
355385
if hasGVK {
356386
var err error
357-
gvk, err = getGvk(blder.forInput.object, blder.mgr.GetScheme())
387+
gvk, err = getGvk(blder.forInput.object, blder.cluster.GetScheme())
358388
if err != nil {
359389
return err
360390
}

pkg/builder/controller_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strings"
2323
"sync/atomic"
2424

25+
"github.com/davecgh/go-spew/spew"
2526
"github.com/go-logr/logr"
2627
. "github.com/onsi/ginkgo/v2"
2728
. "github.com/onsi/gomega"
@@ -45,6 +46,7 @@ import (
4546
"sigs.k8s.io/controller-runtime/pkg/predicate"
4647
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4748
"sigs.k8s.io/controller-runtime/pkg/scheme"
49+
"sigs.k8s.io/logical-cluster"
4850
)
4951

5052
type typedNoop struct{}
@@ -556,6 +558,133 @@ var _ = Describe("application", func() {
556558
}).Should(BeTrue())
557559
})
558560
})
561+
562+
Context("with logical adapter", func() {
563+
It("should support watching across clusters", func() {
564+
adapter := &fakeLogicalAdapter{
565+
list: []logical.Name{
566+
"cluster1",
567+
"cluster2",
568+
},
569+
watch: make(chan logical.Event),
570+
}
571+
mgr, err := manager.New(cfg, manager.Options{}.WithExperimentalLogicalAdapter(adapter))
572+
Expect(err).NotTo(HaveOccurred())
573+
574+
ch1 := make(chan reconcile.Request)
575+
ch2 := make(chan reconcile.Request)
576+
Expect(
577+
ControllerManagedBy(mgr).
578+
For(&appsv1.Deployment{}).
579+
Owns(&appsv1.ReplicaSet{}).
580+
Complete(reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
581+
spew.Dump(req)
582+
defer GinkgoRecover()
583+
switch req.Cluster {
584+
case "cluster1":
585+
ch1 <- req
586+
case "cluster2":
587+
ch2 <- req
588+
default:
589+
// Do nothing.
590+
}
591+
return reconcile.Result{}, nil
592+
})),
593+
).To(Succeed())
594+
595+
ctx, cancel := context.WithCancel(context.Background())
596+
defer cancel()
597+
By("Starting the manager")
598+
go func() {
599+
defer GinkgoRecover()
600+
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
601+
}()
602+
603+
By("Creating a deployment")
604+
dep := &appsv1.Deployment{
605+
ObjectMeta: metav1.ObjectMeta{
606+
Name: "deploy-multi-cluster",
607+
Namespace: "default",
608+
},
609+
Spec: appsv1.DeploymentSpec{
610+
Selector: &metav1.LabelSelector{
611+
MatchLabels: map[string]string{"foo": "bar"},
612+
},
613+
Template: corev1.PodTemplateSpec{
614+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
615+
Spec: corev1.PodSpec{
616+
Containers: []corev1.Container{
617+
{
618+
Name: "nginx",
619+
Image: "nginx",
620+
},
621+
},
622+
},
623+
},
624+
},
625+
}
626+
cluster1, err := mgr.LogicalClusterGetter()("cluster1")
627+
Expect(err).NotTo(HaveOccurred())
628+
Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed())
629+
630+
By("Waiting for the Deployment Reconcile on both clusters")
631+
Eventually(ch1).Should(Receive(Equal(reconcile.Request{
632+
NamespacedName: types.NamespacedName{
633+
Name: dep.Name,
634+
Namespace: dep.Namespace,
635+
},
636+
Cluster: "cluster1",
637+
})))
638+
Eventually(ch2).Should(Receive(Equal(reconcile.Request{
639+
NamespacedName: types.NamespacedName{
640+
Name: dep.Name,
641+
Namespace: dep.Namespace,
642+
},
643+
Cluster: "cluster2",
644+
})))
645+
646+
By("Creating a ReplicaSet")
647+
// Expect a Reconcile when an Owned object is managedObjects.
648+
rs := &appsv1.ReplicaSet{
649+
ObjectMeta: metav1.ObjectMeta{
650+
Namespace: "default",
651+
Name: "rs-multi-cluster",
652+
Labels: dep.Spec.Selector.MatchLabels,
653+
OwnerReferences: []metav1.OwnerReference{
654+
{
655+
Name: dep.Name,
656+
Kind: "Deployment",
657+
APIVersion: "apps/v1",
658+
Controller: pointer.Bool(true),
659+
UID: dep.UID,
660+
},
661+
},
662+
},
663+
Spec: appsv1.ReplicaSetSpec{
664+
Selector: dep.Spec.Selector,
665+
Template: dep.Spec.Template,
666+
},
667+
}
668+
Expect(err).NotTo(HaveOccurred())
669+
Expect(cluster1.GetClient().Create(ctx, rs)).To(Succeed())
670+
671+
By("Waiting for the Deployment Reconcile on both clusters")
672+
Eventually(ch1).Should(Receive(Equal(reconcile.Request{
673+
NamespacedName: types.NamespacedName{
674+
Name: dep.Name,
675+
Namespace: dep.Namespace,
676+
},
677+
Cluster: "cluster1",
678+
})))
679+
Eventually(ch2).Should(Receive(Equal(reconcile.Request{
680+
NamespacedName: types.NamespacedName{
681+
Name: dep.Name,
682+
Namespace: dep.Namespace,
683+
},
684+
Cluster: "cluster2",
685+
})))
686+
})
687+
})
559688
})
560689

561690
// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
@@ -695,3 +824,33 @@ type fakeType struct {
695824

696825
func (*fakeType) GetObjectKind() schema.ObjectKind { return nil }
697826
func (*fakeType) DeepCopyObject() runtime.Object { return nil }
827+
828+
type fakeLogicalAdapter struct {
829+
list []logical.Name
830+
listErr error
831+
832+
watch chan logical.Event
833+
}
834+
835+
func (f *fakeLogicalAdapter) RESTConfig(name logical.Name) (*rest.Config, error) {
836+
return testenv.Config, nil
837+
}
838+
839+
func (f *fakeLogicalAdapter) List() ([]logical.Name, error) {
840+
return f.list, f.listErr
841+
}
842+
843+
func (f *fakeLogicalAdapter) Watch() (logical.Watcher, error) {
844+
return &fakeLogicalWatcher{ch: f.watch}, nil
845+
}
846+
847+
type fakeLogicalWatcher struct {
848+
ch chan logical.Event
849+
}
850+
851+
func (f *fakeLogicalWatcher) Stop() {
852+
}
853+
854+
func (f *fakeLogicalWatcher) ResultChan() <-chan logical.Event {
855+
return f.ch
856+
}

pkg/cluster/cluster.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@ import (
3131
"k8s.io/utils/pointer"
3232
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3333
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
34+
"sigs.k8s.io/logical-cluster"
3435

3536
"sigs.k8s.io/controller-runtime/pkg/cache"
3637
"sigs.k8s.io/controller-runtime/pkg/client"
3738
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
3839
)
3940

41+
// LogicalGetterFunc is a function that returns a client for a given logical cluster name.
42+
type LogicalGetterFunc func(logical.Name) (Cluster, error)
43+
4044
// Cluster provides various methods to interact with a cluster.
4145
type Cluster interface {
4246
// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver

0 commit comments

Comments
 (0)