Skip to content

Commit bbfc18c

Browse files
Add test cases to check cache sync timeouts
Co-authored-by: Alvaro Aleman <[email protected]>
1 parent cdc2e0e commit bbfc18c

File tree

2 files changed

+42
-33
lines changed

2 files changed

+42
-33
lines changed

pkg/internal/controller/controller.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type Controller struct {
7979
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
8080
ctx context.Context
8181

82-
// CacheSyncTimeout refers to the time limit set on waiting cache to sync
82+
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
8383
// Defaults to 2 minutes if not set.
8484
CacheSyncTimeout time.Duration
8585

@@ -146,10 +146,6 @@ func (c *Controller) Start(ctx context.Context) error {
146146
// Set the internal context.
147147
c.ctx = ctx
148148

149-
// use a context with timeout for launching sources and syncing caches.
150-
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
151-
defer cancel()
152-
153149
c.Queue = c.MakeQueue()
154150
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
155151

@@ -164,7 +160,10 @@ func (c *Controller) Start(ctx context.Context) error {
164160
// caches.
165161
for _, watch := range c.startWatches {
166162
c.Log.Info("Starting EventSource", "source", watch.src)
167-
if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
163+
164+
watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
165+
defer cancel()
166+
if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
168167
return err
169168
}
170169
}
@@ -177,9 +176,14 @@ func (c *Controller) Start(ctx context.Context) error {
177176
if !ok {
178177
continue
179178
}
179+
180+
// use a context with timeout for launching sources and syncing caches.
181+
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
182+
defer cancel()
183+
184+
// WaitForSync waits for a definitive timeout, and returns if there
185+
// is an error or a timeout
180186
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
181-
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
182-
// Leaving it here because that could happen in the future
183187
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
184188
c.Log.Error(err, "Could not wait for Cache to sync")
185189
return err

pkg/internal/controller/controller_test.go

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,12 @@ var _ = Describe("controller", func() {
124124

125125
It("should error when cache sync timeout occurs", func(done Done) {
126126
ctrl.CacheSyncTimeout = 10 * time.Nanosecond
127+
127128
c, err := cache.New(cfg, cache.Options{})
128129
Expect(err).NotTo(HaveOccurred())
129-
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
130-
Expect(err).NotTo(HaveOccurred())
131-
sync := false
130+
132131
ctrl.startWatches = []watchDescription{{
133-
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{
134-
Synced: &sync,
135-
}),
132+
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
136133
}}
137134

138135
err = ctrl.Start(context.TODO())
@@ -142,39 +139,35 @@ var _ = Describe("controller", func() {
142139
close(done)
143140
})
144141

145-
It("should not error when cache sync time out is of reasonable value", func(done Done) {
142+
It("should not error when cache sync timeout is of sufficiently high", func(done Done) {
146143
ctrl.CacheSyncTimeout = 1 * time.Second
144+
145+
ctx, cancel := context.WithCancel(context.Background())
146+
defer cancel()
147+
148+
sourceSynced := make(chan struct{})
147149
c, err := cache.New(cfg, cache.Options{})
148150
Expect(err).NotTo(HaveOccurred())
149151
ctrl.startWatches = []watchDescription{{
150-
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
152+
src: &singnallingSourceWrapper{
153+
SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c),
154+
cacheSyncDone: sourceSynced,
155+
},
151156
}}
152157

153-
By("running the cache and waiting for it to sync")
154158
go func() {
155159
defer GinkgoRecover()
156-
Expect(c.Start(context.TODO())).To(Succeed())
160+
Expect(c.Start(ctx)).To(Succeed())
157161
}()
158-
close(done)
159-
})
160162

161-
It("should error when timeout is set to a very low value such that cache cannot sync", func(done Done) {
162-
ctrl.CacheSyncTimeout = 1 * time.Nanosecond
163-
c, err := cache.New(cfg, cache.Options{})
164-
Expect(err).NotTo(HaveOccurred())
165-
ctrl.startWatches = []watchDescription{{
166-
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
167-
}}
168-
169-
By("running the cache and waiting for it to sync")
170163
go func() {
171164
defer GinkgoRecover()
172-
err = ctrl.Start(context.TODO())
173-
Expect(err).To(HaveOccurred())
174-
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
165+
Expect(ctrl.Start(ctx)).To(Succeed())
175166
}()
167+
168+
<-sourceSynced
176169
close(done)
177-
})
170+
}, 10.0)
178171

179172
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
180173
pr1 := &predicate.Funcs{}
@@ -865,3 +858,15 @@ func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reco
865858
}
866859
return res.Result, res.Err
867860
}
861+
862+
type singnallingSourceWrapper struct {
863+
cacheSyncDone chan struct{}
864+
source.SyncingSource
865+
}
866+
867+
func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
868+
defer func() {
869+
close(s.cacheSyncDone)
870+
}()
871+
return s.SyncingSource.WaitForSync(ctx)
872+
}

0 commit comments

Comments
 (0)