Skip to content

Commit 9d51b73

Browse files
committed
Add cluster.Provider
Signed-off-by: Vince Prignano <[email protected]>
1 parent 8d3b8f7 commit 9d51b73

File tree

8 files changed

+117
-63
lines changed

8 files changed

+117
-63
lines changed

examples/fleet/main.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"k8s.io/klog/v2"
1616
ctrl "sigs.k8s.io/controller-runtime"
1717
"sigs.k8s.io/controller-runtime/pkg/builder"
18+
"sigs.k8s.io/controller-runtime/pkg/cluster"
1819
"sigs.k8s.io/controller-runtime/pkg/envtest"
1920
"sigs.k8s.io/controller-runtime/pkg/log"
2021
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -124,14 +125,14 @@ func (k *KindAdapter) List() ([]logical.Name, error) {
124125
return res, nil
125126
}
126127

127-
func (k *KindAdapter) Watch() (logical.Watcher, error) {
128-
return &KindWatcher{ch: make(chan logical.Event)}, nil
128+
func (k *KindAdapter) Watch() (cluster.Watcher, error) {
129+
return &KindWatcher{ch: make(chan cluster.WatchEvent)}, nil
129130
}
130131

131132
type KindWatcher struct {
132133
init sync.Once
133134
wg sync.WaitGroup
134-
ch chan logical.Event
135+
ch chan cluster.WatchEvent
135136
cancel context.CancelFunc
136137
}
137138

@@ -142,7 +143,7 @@ func (k *KindWatcher) Stop() {
142143
k.wg.Wait()
143144
close(k.ch)
144145
}
145-
func (k *KindWatcher) ResultChan() <-chan logical.Event {
146+
func (k *KindWatcher) ResultChan() <-chan cluster.WatchEvent {
146147
k.init.Do(func() {
147148
ctx, cancel := context.WithCancel(context.Background())
148149
k.cancel = cancel
@@ -161,23 +162,23 @@ func (k *KindWatcher) ResultChan() <-chan logical.Event {
161162
}
162163
newSet := sets.New(list...)
163164
// Check for new clusters.
164-
for _, cluster := range newSet.Difference(set).UnsortedList() {
165-
if !strings.HasPrefix(cluster, "fleet-") {
165+
for _, cl := range newSet.Difference(set).UnsortedList() {
166+
if !strings.HasPrefix(cl, "fleet-") {
166167
continue
167168
}
168-
k.ch <- logical.Event{
169+
k.ch <- cl.WatchEvent{
169170
Type: watch.Added,
170-
Name: logical.Name(cluster),
171+
Name: logical.Name(cl),
171172
}
172173
}
173174
// Check for deleted clusters.
174-
for _, cluster := range set.Difference(newSet).UnsortedList() {
175-
if !strings.HasPrefix(cluster, "fleet-") {
175+
for _, cl := range set.Difference(newSet).UnsortedList() {
176+
if !strings.HasPrefix(cl, "fleet-") {
176177
continue
177178
}
178-
k.ch <- logical.Event{
179+
k.ch <- cluster.WatchEvent{
179180
Type: watch.Deleted,
180-
Name: logical.Name(cluster),
181+
Name: logical.Name(cl),
181182
}
182183
}
183184
set = newSet

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ require (
2424
k8s.io/component-base v0.26.1
2525
k8s.io/klog/v2 v2.90.0
2626
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448
27-
sigs.k8s.io/logical-cluster v0.0.0-00010101000000-000000000000
27+
sigs.k8s.io/logical-cluster v0.0.1-alpha.0
2828
sigs.k8s.io/yaml v1.3.0
2929
)
3030

@@ -74,5 +74,3 @@ require (
7474
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
7575
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
7676
)
77-
78-
replace sigs.k8s.io/logical-cluster => github.com/vincepri/logical-cluster v0.0.0-20230221175249-b5d99f705d5b

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
268268
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
269269
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
270270
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
271-
github.com/vincepri/logical-cluster v0.0.0-20230221175249-b5d99f705d5b h1:eGYbpQr7r86O5wqbYMgLIp+wUZ5DSJpICK6UKYXTwPk=
272-
github.com/vincepri/logical-cluster v0.0.0-20230221175249-b5d99f705d5b/go.mod h1:5keWHzDm2ppkz5PTL3upRzPeVeci8JLApFv1u+Q6uYE=
273271
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
274272
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
275273
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -619,6 +617,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
619617
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
620618
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
621619
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
620+
sigs.k8s.io/logical-cluster v0.0.1-alpha.0 h1:vigMG0I1fgDVn0hsTOeZB55AmplXC7D4iLa60qeyX70=
621+
sigs.k8s.io/logical-cluster v0.0.1-alpha.0/go.mod h1:7YymTkuUFI+tkwCRPMsk+TiyBQiPDKRArxVAAGpezZI=
622622
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
623623
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
624624
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=

pkg/builder/controller_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
"sigs.k8s.io/controller-runtime/pkg/cache"
3939
"sigs.k8s.io/controller-runtime/pkg/client"
40+
"sigs.k8s.io/controller-runtime/pkg/cluster"
4041
"sigs.k8s.io/controller-runtime/pkg/config"
4142
"sigs.k8s.io/controller-runtime/pkg/controller"
4243
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -560,14 +561,14 @@ var _ = Describe("application", func() {
560561

561562
Context("with logical adapter", func() {
562563
It("should support watching across clusters", func() {
563-
adapter := &fakeLogicalAdapter{
564+
adapter := &fakeClusterProvider{
564565
list: []logical.Name{
565566
"cluster1",
566567
"cluster2",
567568
},
568-
watch: make(chan logical.Event),
569+
watch: make(chan cluster.WatchEvent),
569570
}
570-
mgr, err := manager.New(cfg, manager.Options{}.WithExperimentalLogicalAdapter(adapter))
571+
mgr, err := manager.New(cfg, manager.Options{}.WithExperimentalClusterProvider(adapter))
571572
Expect(err).NotTo(HaveOccurred())
572573

573574
ctx, cancel := context.WithCancel(context.Background())
@@ -836,32 +837,32 @@ type fakeType struct {
836837
func (*fakeType) GetObjectKind() schema.ObjectKind { return nil }
837838
func (*fakeType) DeepCopyObject() runtime.Object { return nil }
838839

839-
type fakeLogicalAdapter struct {
840+
type fakeClusterProvider struct {
840841
list []logical.Name
841842
listErr error
842843

843-
watch chan logical.Event
844+
watch chan cluster.WatchEvent
844845
}
845846

846-
func (f *fakeLogicalAdapter) RESTConfig(name logical.Name) (*rest.Config, error) {
847-
return testenv.Config, nil
847+
func (f *fakeClusterProvider) Get(ctx context.Context, name logical.Name, opts ...cluster.Option) (cluster.Cluster, error) {
848+
return cluster.New(testenv.Config, opts...)
848849
}
849850

850-
func (f *fakeLogicalAdapter) List() ([]logical.Name, error) {
851+
func (f *fakeClusterProvider) List() ([]logical.Name, error) {
851852
return f.list, f.listErr
852853
}
853854

854-
func (f *fakeLogicalAdapter) Watch() (logical.Watcher, error) {
855+
func (f *fakeClusterProvider) Watch() (cluster.Watcher, error) {
855856
return &fakeLogicalWatcher{ch: f.watch}, nil
856857
}
857858

858859
type fakeLogicalWatcher struct {
859-
ch chan logical.Event
860+
ch chan cluster.WatchEvent
860861
}
861862

862863
func (f *fakeLogicalWatcher) Stop() {
863864
}
864865

865-
func (f *fakeLogicalWatcher) ResultChan() <-chan logical.Event {
866+
func (f *fakeLogicalWatcher) ResultChan() <-chan cluster.WatchEvent {
866867
return f.ch
867868
}

pkg/cluster/provider.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package cluster
2+
3+
import (
4+
"context"
5+
6+
"k8s.io/apimachinery/pkg/watch"
7+
"sigs.k8s.io/logical-cluster"
8+
)
9+
10+
// Provider defines methods to retrieve, list, and watch fleet of clusters.
11+
// The provider is responsible for discovering and managing the lifecycle of each
12+
// cluster.
13+
//
14+
// Example: A Cluster API provider would be responsible for discovering and managing
15+
// clusters that are backed by Cluster API resources, which can live
16+
// in multiple namespaces in a single management cluster.
17+
type Provider interface {
18+
Get(ctx context.Context, name logical.Name, opts ...Option) (Cluster, error)
19+
20+
// List returns a list of logical clusters.
21+
// This method is used to discover the initial set of logical clusters
22+
// and to refresh the list of logical clusters periodically.
23+
List() ([]logical.Name, error)
24+
25+
// Watch returns a Watcher that watches for changes to a list of logical clusters
26+
// and react to potential changes.
27+
Watch() (Watcher, error)
28+
}
29+
30+
// Watcher watches for changes to clusters and provides events to a channel
31+
// for the Manager to react to.
32+
type Watcher interface {
33+
// Stop stops watching. Will close the channel returned by ResultChan(). Releases
34+
// any resources used by the watch.
35+
Stop()
36+
37+
// ResultChan returns a chan which will receive all the events. If an error occurs
38+
// or Stop() is called, the implementation will close this channel and
39+
// release any resources used by the watch.
40+
ResultChan() <-chan WatchEvent
41+
}
42+
43+
// WatchEvent is an event that is sent when a cluster is added, modified, or deleted.
44+
type WatchEvent struct {
45+
// Type is the type of event that occurred.
46+
//
47+
// - ADDED or MODIFIED
48+
// The logical cluster was added or updated: a new RESTConfig is available, or needs to be refreshed.
49+
// - DELETED
50+
// The logical cluster was deleted: the cluster is removed.
51+
// - ERROR
52+
// An error occurred while watching the logical cluster: the cluster is removed.
53+
// - BOOKMARK
54+
// A periodic event is sent that contains no new data: ignored.
55+
Type watch.EventType
56+
57+
// Name is the name of the logical cluster related to the event.
58+
Name logical.Name
59+
}

pkg/manager/internal.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ type controllerManager struct {
8585
defaultCluster cluster.Cluster
8686
defaultClusterOptions cluster.Option
8787

88-
logicalAdapter logical.Adapter
88+
logicalProvider cluster.Provider
8989

9090
logicalLock sync.RWMutex // protects logicalClusters
9191
logicalClusters map[logical.Name]*logicalCluster
@@ -307,7 +307,7 @@ func (cm *controllerManager) engageClusterAwareRunnables() {
307307
defer cm.Unlock()
308308

309309
// If we don't have a logical adapter, we cannot sync the runnables.
310-
if cm.logicalAdapter == nil {
310+
if cm.logicalProvider == nil {
311311
return
312312
}
313313

@@ -326,7 +326,7 @@ func (cm *controllerManager) engageClusterAwareRunnables() {
326326
func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical.Name) (c *logicalCluster, err error) {
327327
// Check if the manager was configured with a logical adapter,
328328
// otherwise we cannot retrieve the cluster.
329-
if cm.logicalAdapter == nil {
329+
if cm.logicalProvider == nil {
330330
return nil, fmt.Errorf("manager was not configured with a logical adapter, cannot retrieve %q", name)
331331
}
332332

@@ -349,28 +349,23 @@ func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical
349349
}
350350

351351
// Create a new cluster.
352-
var cfg *rest.Config
352+
var cl cluster.Cluster
353353
{
354354
// TODO(vincepri): Make this timeout configurable.
355355
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
356356
defer cancel()
357357
var watchErr error
358358
if err := wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (done bool, err error) {
359-
cfg, watchErr = cm.logicalAdapter.RESTConfig(name)
359+
cl, watchErr = cm.logicalProvider.Get(ctx, name, cm.defaultClusterOptions, cluster.WithName(name))
360360
if watchErr != nil {
361361
return false, nil //nolint:nilerr // We want to keep trying.
362362
}
363363
return true, nil
364364
}); err != nil {
365-
return nil, fmt.Errorf("failed to retrieve RESTConfig: %w", kerrors.NewAggregate([]error{err, watchErr}))
365+
return nil, fmt.Errorf("failed create logical cluster %q: %w", name, kerrors.NewAggregate([]error{err, watchErr}))
366366
}
367367
}
368368

369-
cl, err := cluster.New(cfg, cm.defaultClusterOptions, cluster.WithName(name))
370-
if err != nil {
371-
return nil, fmt.Errorf("cannot create logical cluster %q: %w", name, err)
372-
}
373-
374369
// We add the Cluster to the manager as a Runnable, which is going to be categorized
375370
// as a Cache-backed Runnable.
376371
//
@@ -380,7 +375,7 @@ func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical
380375
return nil, fmt.Errorf("cannot add logical cluster %q to manager: %w", name, err)
381376
}
382377
// Create a new context for the Cluster, so that it can be stopped independently.
383-
ctx, cancel := context.WithCancel(logical.IntoContext(context.Background(), name))
378+
ctx, cancel := context.WithCancel(context.Background())
384379
c = &logicalCluster{
385380
Cluster: cl,
386381
ctx: ctx,
@@ -393,7 +388,7 @@ func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical
393388
func (cm *controllerManager) removeLogicalCluster(name logical.Name) error {
394389
// Check if the manager was configured with a logical adapter,
395390
// otherwise we cannot retrieve the cluster.
396-
if cm.logicalAdapter == nil {
391+
if cm.logicalProvider == nil {
397392
return fmt.Errorf("manager was not configured with a logical adapter, cannot retrieve %q", name)
398393
}
399394

@@ -613,10 +608,10 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g
613608
}
614609

615610
// If the manager has been configured with a logical adapter, start it.
616-
if cm.logicalAdapter != nil {
611+
if cm.logicalProvider != nil {
617612
if err := cm.add(RunnableFunc(func(ctx context.Context) error {
618613
resync := func() error {
619-
clusterList, err := cm.logicalAdapter.List()
614+
clusterList, err := cm.logicalProvider.List()
620615
if err != nil {
621616
return err
622617
}
@@ -644,7 +639,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g
644639
}
645640

646641
// Create a watcher and start watching for changes.
647-
watcher, err := cm.logicalAdapter.Watch()
642+
watcher, err := cm.logicalProvider.Watch()
648643
if err != nil {
649644
return err
650645
}

pkg/manager/manager.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,10 @@ type Options struct {
351351
newMetricsListener func(addr string) (net.Listener, error)
352352
newHealthProbeListener func(addr string) (net.Listener, error)
353353

354-
// logicalAdapter is an EXPERIMENTAL feature that allows the manager to
354+
// logicalClusterProvider is an EXPERIMENTAL feature that allows the manager to
355355
// operate against many Kubernetes clusters at once.
356356
// It can be used by invoking WithExperimentalLogicalAdapter(adapter) on Options.
357-
logicalAdapter logical.Adapter
357+
logicalClusterProvider cluster.Provider
358358
}
359359

360360
// BaseContextFunc is a function used to provide a base Context to Runnables
@@ -473,7 +473,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
473473
stopProcedureEngaged: pointer.Int64(0),
474474
defaultCluster: cluster,
475475
defaultClusterOptions: clusterOptions,
476-
logicalAdapter: options.logicalAdapter,
476+
logicalProvider: options.logicalClusterProvider,
477477
logicalClusters: make(map[logical.Name]*logicalCluster),
478478
runnables: newRunnables(options.BaseContext, errChan),
479479
errChan: errChan,
@@ -587,13 +587,13 @@ func (o Options) AndFromOrDie(loader config.ControllerManagerConfiguration) Opti
587587
return o
588588
}
589589

590-
// WithExperimentalLogicalAdapter sets the logical adapter to use for the manager.
590+
// WithExperimentalClusterProvider sets the logical adapter to use for the manager.
591591
// This is an EXPERIMENTAL feature that allows a Manager to be used against a fleet
592592
// of Kubernetes clusters.
593593
//
594594
// NOTE: The method signature may change or be removed in the future.
595-
func (o Options) WithExperimentalLogicalAdapter(adapter logical.Adapter) Options {
596-
o.logicalAdapter = adapter
595+
func (o Options) WithExperimentalClusterProvider(provider cluster.Provider) Options {
596+
o.logicalClusterProvider = provider
597597
return o
598598
}
599599

0 commit comments

Comments
 (0)