Skip to content

Commit b627bce

Browse files
committed
🌱 Source should retry to get informers until timeout expires
Signed-off-by: Vince Prignano <[email protected]>
1 parent 13f1400 commit b627bce

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

pkg/source/source.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"errors"
2222
"fmt"
2323
"sync"
24+
"time"
2425

2526
"k8s.io/apimachinery/pkg/api/meta"
27+
"k8s.io/apimachinery/pkg/util/wait"
2628
"k8s.io/client-go/util/workqueue"
2729
"sigs.k8s.io/controller-runtime/pkg/client"
2830
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
119121
ctx, ks.startCancel = context.WithCancel(ctx)
120122
ks.started = make(chan error)
121123
go func() {
122-
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
123-
i, err := ks.cache.GetInformer(ctx, ks.Type)
124-
if err != nil {
125-
kindMatchErr := &meta.NoKindMatchError{}
126-
if errors.As(err, &kindMatchErr) {
127-
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
128-
"kind", kindMatchErr.GroupKind)
124+
var (
125+
i cache.Informer
126+
lastErr error
127+
)
128+
129+
// Tries to get an informer until it returns true,
130+
// an error or the specified context is cancelled or expired.
131+
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
132+
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
133+
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
134+
if lastErr != nil {
135+
kindMatchErr := &meta.NoKindMatchError{}
136+
if errors.As(lastErr, &kindMatchErr) {
137+
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
138+
"kind", kindMatchErr.GroupKind)
139+
}
140+
return false, nil // Retry.
141+
}
142+
return true, nil
143+
}); err != nil {
144+
if lastErr != nil {
145+
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
146+
return
129147
}
130148
ks.started <- err
131149
return
132150
}
151+
133152
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
134153
if !ks.cache.WaitForCacheSync(ctx) {
135154
// Would be great to return something more informative here

0 commit comments

Comments
 (0)