Skip to content

Commit 6cf984c

Browse files
committed
Make sure to only init watcher once, RESTConfig should have backoff
Signed-off-by: Vince Prignano <[email protected]>
1 parent ce4b76c commit 6cf984c

File tree

2 files changed

+60
-45
lines changed

2 files changed

+60
-45
lines changed

examples/fleet/main.go

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ func main() {
6262
For(&corev1.Pod{}).Complete(reconcile.Func(
6363
func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6464
log := log.FromContext(ctx)
65-
log.Info("Reconciling pod", "name", req.Name)
6665

6766
cluster, err := mgr.GetCluster(req.Cluster)
6867
if err != nil {
@@ -75,6 +74,7 @@ func main() {
7574
if err := client.Get(ctx, req.NamespacedName, pod); err != nil {
7675
return reconcile.Result{}, err
7776
}
77+
log.Info("Reconciling pod", "name", pod.Name, "uuid", pod.UID)
7878

7979
// Print any annotations that start with fleet.
8080
for k, v := range pod.Labels {
@@ -87,7 +87,7 @@ func main() {
8787
},
8888
))
8989

90-
entryLog.Info("starting manager")
90+
entryLog.Info("Starting manager")
9191
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
9292
entryLog.Error(err, "unable to run manager")
9393
os.Exit(1)
@@ -129,6 +129,7 @@ func (k *KindAdapter) Watch() (logical.Watcher, error) {
129129
}
130130

131131
type KindWatcher struct {
132+
init sync.Once
132133
wg sync.WaitGroup
133134
ch chan logical.Event
134135
cancel context.CancelFunc
@@ -142,47 +143,49 @@ func (k *KindWatcher) Stop() {
142143
close(k.ch)
143144
}
144145
func (k *KindWatcher) ResultChan() <-chan logical.Event {
145-
ctx, cancel := context.WithCancel(context.Background())
146-
k.cancel = cancel
147-
set := sets.New[string]()
148-
k.wg.Add(1)
149-
go func() {
150-
defer k.wg.Done()
151-
for {
152-
select {
153-
case <-time.After(2 * time.Second):
154-
provider := kind.NewProvider()
155-
list, err := provider.List()
156-
if err != nil {
157-
klog.Error(err)
158-
continue
159-
}
160-
newSet := sets.New(list...)
161-
// Check for new clusters.
162-
for _, cluster := range newSet.Difference(set).UnsortedList() {
163-
if !strings.HasPrefix(cluster, "fleet-") {
146+
k.init.Do(func() {
147+
ctx, cancel := context.WithCancel(context.Background())
148+
k.cancel = cancel
149+
set := sets.New[string]()
150+
k.wg.Add(1)
151+
go func() {
152+
defer k.wg.Done()
153+
for {
154+
select {
155+
case <-time.After(2 * time.Second):
156+
provider := kind.NewProvider()
157+
list, err := provider.List()
158+
if err != nil {
159+
klog.Error(err)
164160
continue
165161
}
166-
k.ch <- logical.Event{
167-
Type: watch.Added,
168-
Name: logical.Name(cluster),
169-
}
170-
}
171-
// Check for deleted clusters.
172-
for _, cluster := range set.Difference(newSet).UnsortedList() {
173-
if !strings.HasPrefix(cluster, "fleet-") {
174-
continue
162+
newSet := sets.New(list...)
163+
// Check for new clusters.
164+
for _, cluster := range newSet.Difference(set).UnsortedList() {
165+
if !strings.HasPrefix(cluster, "fleet-") {
166+
continue
167+
}
168+
k.ch <- logical.Event{
169+
Type: watch.Added,
170+
Name: logical.Name(cluster),
171+
}
175172
}
176-
k.ch <- logical.Event{
177-
Type: watch.Deleted,
178-
Name: logical.Name(cluster),
173+
// Check for deleted clusters.
174+
for _, cluster := range set.Difference(newSet).UnsortedList() {
175+
if !strings.HasPrefix(cluster, "fleet-") {
176+
continue
177+
}
178+
k.ch <- logical.Event{
179+
Type: watch.Deleted,
180+
Name: logical.Name(cluster),
181+
}
179182
}
183+
set = newSet
184+
case <-ctx.Done():
185+
return
180186
}
181-
set = newSet
182-
case <-ctx.Done():
183-
return
184187
}
185-
}
186-
}()
188+
}()
189+
})
187190
return k.ch
188191
}

pkg/manager/internal.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
259259
}
260260

261261
func (cm *controllerManager) GetCluster(name logical.Name) (cluster.Cluster, error) {
262-
return cm.getLogicalCluster(name)
262+
return cm.getLogicalCluster(context.TODO(), name)
263263
}
264264

265265
func (cm *controllerManager) GetHTTPClient() *http.Client {
@@ -337,7 +337,7 @@ func (cm *controllerManager) syncClusterAwareRunnables() {
337337
}
338338
}
339339

340-
func (cm *controllerManager) getLogicalCluster(name logical.Name) (c *logicalCluster, err error) {
340+
func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical.Name) (c *logicalCluster, err error) {
341341
// Check if the manager was configured with a logical adapter,
342342
// otherwise we cannot retrieve the cluster.
343343
if cm.logicalAdapter == nil {
@@ -363,10 +363,22 @@ func (cm *controllerManager) getLogicalCluster(name logical.Name) (c *logicalClu
363363
}
364364

365365
// Create a new cluster.
366-
cfg, err := cm.logicalAdapter.RESTConfig(name)
367-
if err != nil {
368-
return nil, fmt.Errorf("cannot find logical cluster %q from adapter: %w", name, err)
366+
var cfg *rest.Config
367+
{
368+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
369+
defer cancel()
370+
var watchErr error
371+
if err := wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (done bool, err error) {
372+
cfg, watchErr = cm.logicalAdapter.RESTConfig(name)
373+
if watchErr != nil {
374+
return false, nil // retry
375+
}
376+
return true, nil
377+
}); err != nil {
378+
return nil, fmt.Errorf("failed to retrieve RESTConfig: %w", kerrors.NewAggregate([]error{err, watchErr}))
379+
}
369380
}
381+
370382
cl, err := cluster.New(cfg, cm.defaultClusterOptions)
371383
if err != nil {
372384
return nil, fmt.Errorf("cannot create logical cluster %q: %w", name, err)
@@ -627,7 +639,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g
627639
return err
628640
}
629641
for _, name := range clusterList {
630-
if _, err := cm.getLogicalCluster(name); err != nil {
642+
if _, err := cm.getLogicalCluster(ctx, name); err != nil {
631643
return err
632644
}
633645
}
@@ -672,7 +684,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g
672684
case event := <-watcher.ResultChan():
673685
switch event.Type {
674686
case watch.Added, watch.Modified:
675-
if _, err := cm.getLogicalCluster(event.Name); err != nil {
687+
if _, err := cm.getLogicalCluster(ctx, event.Name); err != nil {
676688
return err
677689
}
678690
cm.syncClusterAwareRunnables()

0 commit comments

Comments
 (0)