Skip to content

Commit 56e0c2b

Browse files
committed
SQUASH: make manager cluster.Aware
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
1 parent 489224f commit 56e0c2b

File tree

7 files changed

+376
-442
lines changed

7 files changed

+376
-442
lines changed

examples/fleet-namespace/main.go

Lines changed: 82 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
corev1 "k8s.io/api/core/v1"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/util/runtime"
30-
"k8s.io/apimachinery/pkg/watch"
3130
"k8s.io/client-go/rest"
3231
toolscache "k8s.io/client-go/tools/cache"
3332
"k8s.io/klog/v2"
@@ -99,10 +98,9 @@ func main() {
9998
entryLog.Error(err, "unable to set up provider")
10099
os.Exit(1)
101100
}
102-
provider := &NamespacedClusterProvider{Cluster: cl}
101+
provider := NewNamespacedClusterProvider(cl)
103102

104-
// Setup a cluster-aware Manager, watching the clusters (= namespaces) through
105-
// the cluster provider.
103+
// Setup a cluster-aware Manager, with the provider to lookup clusters.
106104
entryLog.Info("Setting up cluster-aware manager")
107105
mgr, err := manager.New(cfg, manager.Options{
108106
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
@@ -142,9 +140,15 @@ func main() {
142140
}
143141

144142
entryLog.Info("Starting provider")
143+
if err := provider.Start(ctx, mgr); err != nil { // does not block
144+
entryLog.Error(err, "unable to start provider")
145+
os.Exit(1)
146+
}
147+
148+
entryLog.Info("Starting cluster")
145149
g, ctx := errgroup.WithContext(ctx)
146150
g.Go(func() error {
147-
if err := ignoreCanceled(provider.Start(ctx)); err != nil {
151+
if err := ignoreCanceled(cl.Start(ctx)); err != nil {
148152
return fmt.Errorf("failed to start provider: %w", err)
149153
}
150154
return nil
@@ -169,80 +173,93 @@ func main() {
169173
// to "default" and vice versa, simulating a multi-cluster setup. It uses one
170174
// informer to watch objects for all namespaces.
171175
type NamespacedClusterProvider struct {
172-
cluster.Cluster
173-
}
176+
cluster cluster.Cluster
174177

175-
func (p *NamespacedClusterProvider) Get(ctx context.Context, clusterName string, opts ...cluster.Option) (cluster.Cluster, error) {
176-
ns := &corev1.Namespace{}
177-
if err := p.Cluster.GetCache().Get(ctx, client.ObjectKey{Name: clusterName}, ns); err != nil {
178-
return nil, err
179-
}
178+
mgr manager.Manager
180179

181-
return &NamespacedCluster{clusterName: clusterName, Cluster: p.Cluster}, nil
180+
lock sync.RWMutex
181+
clusters map[string]cluster.Cluster
182+
cancelFns map[string]context.CancelFunc
182183
}
183184

184-
func (p *NamespacedClusterProvider) List(ctx context.Context) ([]string, error) {
185-
nss := &corev1.NamespaceList{}
186-
if err := p.Cluster.GetCache().List(ctx, nss); err != nil {
187-
return nil, err
188-
}
189-
190-
res := make([]string, 0, len(nss.Items))
191-
for _, ns := range nss.Items {
192-
res = append(res, ns.Name)
185+
func NewNamespacedClusterProvider(cl cluster.Cluster) *NamespacedClusterProvider {
186+
return &NamespacedClusterProvider{
187+
cluster: cl,
188+
clusters: map[string]cluster.Cluster{},
189+
cancelFns: map[string]context.CancelFunc{},
193190
}
194-
return res, nil
195191
}
196192

197-
func (p *NamespacedClusterProvider) Watch(ctx context.Context) (cluster.Watcher, error) {
198-
inf, err := p.Cluster.GetCache().GetInformer(ctx, &corev1.Namespace{})
193+
func (p *NamespacedClusterProvider) Start(ctx context.Context, mgr manager.Manager) error {
194+
nsInf, err := p.cluster.GetCache().GetInformer(ctx, &corev1.Namespace{})
199195
if err != nil {
200-
return nil, err
196+
return err
201197
}
202-
return &NamespaceWatcher{inf: inf, ch: make(chan cluster.WatchEvent)}, nil
203-
}
204198

205-
type NamespaceWatcher struct {
206-
inf cache.Informer
207-
init sync.Once
208-
ch chan cluster.WatchEvent
209-
reg toolscache.ResourceEventHandlerRegistration
210-
}
199+
if _, err := nsInf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
200+
AddFunc: func(obj interface{}) {
201+
ns := obj.(*corev1.Namespace)
202+
203+
p.lock.RLock()
204+
if _, ok := p.clusters[ns.Name]; ok {
205+
defer p.lock.RUnlock()
206+
return
207+
}
208+
209+
// create new cluster
210+
p.lock.Lock()
211+
clusterCtx, cancel := context.WithCancel(ctx)
212+
cl := &NamespacedCluster{clusterName: ns.Name, Cluster: p.cluster}
213+
p.clusters[ns.Name] = cl
214+
p.cancelFns[ns.Name] = cancel
215+
p.lock.Unlock()
216+
217+
if err := mgr.Engage(clusterCtx, cl); err != nil {
218+
runtime.HandleError(fmt.Errorf("failed to engage manager with cluster %q: %w", ns.Name, err))
219+
220+
// cleanup
221+
p.lock.Lock()
222+
delete(p.clusters, ns.Name)
223+
delete(p.cancelFns, ns.Name)
224+
p.lock.Unlock()
225+
}
226+
},
227+
DeleteFunc: func(obj interface{}) {
228+
ns := obj.(*corev1.Namespace)
229+
230+
p.lock.RLock()
231+
cl, ok := p.clusters[ns.Name]
232+
if !ok {
233+
p.lock.RUnlock()
234+
return
235+
}
236+
p.lock.RUnlock()
237+
238+
if err := mgr.Disengage(ctx, cl); err != nil {
239+
runtime.HandleError(fmt.Errorf("failed to disengage manager with cluster %q: %w", ns.Name, err))
240+
}
211241

212-
func (w *NamespaceWatcher) Stop() {
213-
if w.reg != nil {
214-
_ = w.inf.RemoveEventHandler(w.reg)
242+
// stop and forget
243+
p.lock.Lock()
244+
p.cancelFns[ns.Name]()
245+
delete(p.clusters, ns.Name)
246+
delete(p.cancelFns, ns.Name)
247+
p.lock.Unlock()
248+
},
249+
}); err != nil {
250+
return err
215251
}
216-
close(w.ch)
252+
253+
return nil
217254
}
218255

219-
func (w *NamespaceWatcher) ResultChan() <-chan cluster.WatchEvent {
220-
w.init.Do(func() {
221-
w.reg, _ = w.inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
222-
AddFunc: func(obj interface{}) {
223-
ns := obj.(*corev1.Namespace)
224-
w.ch <- cluster.WatchEvent{
225-
Type: watch.Added,
226-
ClusterName: ns.Name,
227-
}
228-
},
229-
DeleteFunc: func(obj interface{}) {
230-
ns := obj.(*corev1.Namespace)
231-
w.ch <- cluster.WatchEvent{
232-
Type: watch.Deleted,
233-
ClusterName: ns.Name,
234-
}
235-
},
236-
UpdateFunc: func(oldObj, newObj interface{}) {
237-
ns := newObj.(*corev1.Namespace)
238-
w.ch <- cluster.WatchEvent{
239-
Type: watch.Modified,
240-
ClusterName: ns.Name,
241-
}
242-
},
243-
})
244-
})
245-
return w.ch
256+
func (p *NamespacedClusterProvider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
257+
p.lock.RLock()
258+
defer p.lock.RUnlock()
259+
if cl, ok := p.clusters[clusterName]; ok {
260+
return cl, nil
261+
}
262+
return nil, fmt.Errorf("cluster %s not found", clusterName)
246263
}
247264

248265
func ignoreCanceled(err error) error {

0 commit comments

Comments
 (0)