Skip to content

Commit 2a448a7

Browse files
authored
Merge pull request #1428 from alvaroaleman/fix-cachesync
🐛 Fix cache sync timeout functionality
2 parents 82c68b9 + 4d059e8 commit 2a448a7

File tree

3 files changed

+68
-41
lines changed

3 files changed

+68
-41
lines changed

pkg/internal/controller/controller_test.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"sync"
2324
"time"
@@ -33,6 +34,7 @@ import (
3334
"k8s.io/client-go/util/workqueue"
3435
"sigs.k8s.io/controller-runtime/pkg/cache"
3536
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
37+
"sigs.k8s.io/controller-runtime/pkg/client"
3638
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
3739
"sigs.k8s.io/controller-runtime/pkg/event"
3840
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -104,40 +106,21 @@ var _ = Describe("controller", func() {
104106
close(done)
105107
})
106108

107-
It("should wait for each informer to sync", func(done Done) {
108-
// TODO(directxman12): this test doesn't do what it says it does
109-
110-
c, err := cache.New(cfg, cache.Options{})
111-
Expect(err).NotTo(HaveOccurred())
112-
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
113-
Expect(err).NotTo(HaveOccurred())
114-
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
115-
Expect(err).NotTo(HaveOccurred())
116-
ctrl.startWatches = []watchDescription{{
117-
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
118-
}}
119-
120-
// Use a cancelled context so Start doesn't block
121-
ctx, cancel := context.WithCancel(context.Background())
122-
cancel()
123-
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
124-
125-
close(done)
126-
})
127-
128109
It("should error when cache sync timeout occurs", func(done Done) {
129110
ctrl.CacheSyncTimeout = 10 * time.Nanosecond
130111

131112
c, err := cache.New(cfg, cache.Options{})
132113
Expect(err).NotTo(HaveOccurred())
114+
c = &cacheWithIndefinitelyBlockingGetInformer{c}
133115

134116
ctrl.startWatches = []watchDescription{{
135117
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
136118
}}
119+
ctrl.Name = "testcontroller"
137120

138121
err = ctrl.Start(context.TODO())
139122
Expect(err).To(HaveOccurred())
140-
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
123+
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))
141124

142125
close(done)
143126
})
@@ -944,3 +927,20 @@ func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
944927
}()
945928
return s.SyncingSource.WaitForSync(ctx)
946929
}
930+
931+
var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{}
932+
933+
// cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its
934+
// context is cancelled.
935+
// We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always
936+
// implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real
937+
// caches GetInformer blocking showing this behavior.
938+
// TODO: Remove this once envtest supports a secure apiserver.
939+
type cacheWithIndefinitelyBlockingGetInformer struct {
940+
cache.Cache
941+
}
942+
943+
func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
944+
<-ctx.Done()
945+
return nil, errors.New("GetInformer timed out")
946+
}

pkg/source/source.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ type Kind struct {
9191

9292
// cache used to watch APIs
9393
cache cache.Cache
94+
95+
// started may contain an error if one was encountered during startup. If its closed and does not
96+
// contain an error, startup and syncing finished.
97+
started chan error
98+
startCancel func()
9499
}
95100

96101
var _ SyncingSource = &Kind{}
@@ -110,16 +115,30 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
110115
return fmt.Errorf("must call CacheInto on Kind before calling Start")
111116
}
112117

113-
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
114-
i, err := ks.cache.GetInformer(ctx, ks.Type)
115-
if err != nil {
116-
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
117-
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
118-
"kind", kindMatchErr.GroupKind)
118+
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
119+
// sync that informer (most commonly due to RBAC issues).
120+
ctx, ks.startCancel = context.WithCancel(ctx)
121+
ks.started = make(chan error)
122+
go func() {
123+
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
124+
i, err := ks.cache.GetInformer(ctx, ks.Type)
125+
if err != nil {
126+
kindMatchErr := &meta.NoKindMatchError{}
127+
if errors.As(err, &kindMatchErr) {
128+
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
129+
"kind", kindMatchErr.GroupKind)
130+
}
131+
ks.started <- err
132+
return
119133
}
120-
return err
121-
}
122-
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
134+
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
135+
if !ks.cache.WaitForCacheSync(ctx) {
136+
// Would be great to return something more informative here
137+
ks.started <- errors.New("cache did not sync")
138+
}
139+
close(ks.started)
140+
}()
141+
123142
return nil
124143
}
125144

@@ -133,11 +152,13 @@ func (ks *Kind) String() string {
133152
// WaitForSync implements SyncingSource to allow controllers to wait with starting
134153
// workers until the cache is synced.
135154
func (ks *Kind) WaitForSync(ctx context.Context) error {
136-
if !ks.cache.WaitForCacheSync(ctx) {
137-
// Would be great to return something more informative here
138-
return errors.New("cache did not sync")
155+
select {
156+
case err := <-ks.started:
157+
return err
158+
case <-ctx.Done():
159+
ks.startCancel()
160+
return errors.New("timed out waiting for cache to be synced")
139161
}
140-
return nil
141162
}
142163

143164
var _ inject.Cache = &Kind{}

pkg/source/source_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ var _ = Describe("Source", func() {
9090
},
9191
}, q)
9292
Expect(err).NotTo(HaveOccurred())
93+
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
9394

9495
i, err := ic.FakeInformerFor(&corev1.Pod{})
9596
Expect(err).NotTo(HaveOccurred())
@@ -133,6 +134,7 @@ var _ = Describe("Source", func() {
133134
},
134135
}, q)
135136
Expect(err).NotTo(HaveOccurred())
137+
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
136138

137139
i, err := ic.FakeInformerFor(&corev1.Pod{})
138140
Expect(err).NotTo(HaveOccurred())
@@ -178,6 +180,7 @@ var _ = Describe("Source", func() {
178180
},
179181
}, q)
180182
Expect(err).NotTo(HaveOccurred())
183+
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
181184

182185
i, err := ic.FakeInformerFor(&corev1.Pod{})
183186
Expect(err).NotTo(HaveOccurred())
@@ -208,10 +211,11 @@ var _ = Describe("Source", func() {
208211
})
209212

210213
It("should return an error if syncing fails", func(done Done) {
211-
instance := source.Kind{}
214+
instance := source.Kind{Type: &corev1.Pod{}}
212215
f := false
213216
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
214-
err := instance.WaitForSync(nil)
217+
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
218+
err := instance.WaitForSync(context.Background())
215219
Expect(err).To(HaveOccurred())
216220
Expect(err.Error()).To(Equal("cache did not sync"))
217221

@@ -220,7 +224,7 @@ var _ = Describe("Source", func() {
220224
})
221225

222226
Context("for a Kind not in the cache", func() {
223-
It("should return an error when Start is called", func(done Done) {
227+
It("should return an error when WaitForSync is called", func(done Done) {
224228
ic.Error = fmt.Errorf("test error")
225229
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
226230

@@ -229,7 +233,8 @@ var _ = Describe("Source", func() {
229233
}
230234
Expect(instance.InjectCache(ic)).To(Succeed())
231235
err := instance.Start(ctx, handler.Funcs{}, q)
232-
Expect(err).To(HaveOccurred())
236+
Expect(err).NotTo(HaveOccurred())
237+
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())
233238

234239
close(done)
235240
})
@@ -246,8 +251,9 @@ var _ = Describe("Source", func() {
246251

247252
It("should return an error if syncing fails", func(done Done) {
248253
f := false
249-
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
250-
err := instance.WaitForSync(nil)
254+
instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f})
255+
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
256+
err := instance.WaitForSync(context.Background())
251257
Expect(err).To(HaveOccurred())
252258
Expect(err.Error()).To(Equal("cache did not sync"))
253259

0 commit comments

Comments
 (0)