Skip to content

Commit 66537ca

Browse files
authored
Merge pull request #1247 from varshaprasad96/add/cache-timeout
✨ Allow configuring cache sync timeouts
2 parents 6bcef8a + bbfc18c commit 66537ca

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

pkg/controller/controller.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
2223

2324
"github.com/go-logr/logr"
2425
"k8s.io/client-go/util/workqueue"
@@ -47,6 +48,10 @@ type Options struct {
4748
// Log is the logger used for this controller and passed to each reconciliation
4849
// request via the context field.
4950
Log logr.Logger
51+
52+
// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
53+
// Defaults to 2 minutes if not set.
54+
CacheSyncTimeout time.Duration
5055
}
5156

5257
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -104,6 +109,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
104109
options.MaxConcurrentReconciles = 1
105110
}
106111

112+
if options.CacheSyncTimeout == 0 {
113+
options.CacheSyncTimeout = 2 * time.Minute
114+
}
115+
107116
if options.RateLimiter == nil {
108117
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
109118
}
@@ -120,6 +129,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
120129
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
121130
},
122131
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
132+
CacheSyncTimeout: options.CacheSyncTimeout,
123133
SetFields: mgr.SetFields,
124134
Name: name,
125135
Log: options.Log.WithName("controller").WithName(name),

pkg/internal/controller/controller.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ type Controller struct {
7979
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
8080
ctx context.Context
8181

82+
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
83+
// Defaults to 2 minutes if not set.
84+
CacheSyncTimeout time.Duration
85+
8286
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
8387
startWatches []watchDescription
8488

@@ -156,7 +160,10 @@ func (c *Controller) Start(ctx context.Context) error {
156160
// caches.
157161
for _, watch := range c.startWatches {
158162
c.Log.Info("Starting EventSource", "source", watch.src)
159-
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
163+
164+
watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
165+
defer cancel()
166+
if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
160167
return err
161168
}
162169
}
@@ -169,9 +176,14 @@ func (c *Controller) Start(ctx context.Context) error {
169176
if !ok {
170177
continue
171178
}
172-
if err := syncingSource.WaitForSync(ctx); err != nil {
173-
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
174-
// Leaving it here because that could happen in the future
179+
180+
// use a context with timeout for launching sources and syncing caches.
181+
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
182+
defer cancel()
183+
184+
// WaitForSync waits for a definitive timeout, and returns if there
185+
// is an error or a timeout
186+
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
175187
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
176188
c.Log.Error(err, "Could not wait for Cache to sync")
177189
return err

pkg/internal/controller/controller_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,53 @@ var _ = Describe("controller", func() {
122122
close(done)
123123
})
124124

125+
It("should error when cache sync timeout occurs", func(done Done) {
126+
ctrl.CacheSyncTimeout = 10 * time.Nanosecond
127+
128+
c, err := cache.New(cfg, cache.Options{})
129+
Expect(err).NotTo(HaveOccurred())
130+
131+
ctrl.startWatches = []watchDescription{{
132+
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
133+
}}
134+
135+
err = ctrl.Start(context.TODO())
136+
Expect(err).To(HaveOccurred())
137+
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
138+
139+
close(done)
140+
})
141+
142+
It("should not error when cache sync timeout is of sufficiently high", func(done Done) {
143+
ctrl.CacheSyncTimeout = 1 * time.Second
144+
145+
ctx, cancel := context.WithCancel(context.Background())
146+
defer cancel()
147+
148+
sourceSynced := make(chan struct{})
149+
c, err := cache.New(cfg, cache.Options{})
150+
Expect(err).NotTo(HaveOccurred())
151+
ctrl.startWatches = []watchDescription{{
152+
src: &singnallingSourceWrapper{
153+
SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c),
154+
cacheSyncDone: sourceSynced,
155+
},
156+
}}
157+
158+
go func() {
159+
defer GinkgoRecover()
160+
Expect(c.Start(ctx)).To(Succeed())
161+
}()
162+
163+
go func() {
164+
defer GinkgoRecover()
165+
Expect(ctrl.Start(ctx)).To(Succeed())
166+
}()
167+
168+
<-sourceSynced
169+
close(done)
170+
}, 10.0)
171+
125172
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
126173
pr1 := &predicate.Funcs{}
127174
pr2 := &predicate.Funcs{}
@@ -811,3 +858,15 @@ func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reco
811858
}
812859
return res.Result, res.Err
813860
}
861+
862+
type singnallingSourceWrapper struct {
863+
cacheSyncDone chan struct{}
864+
source.SyncingSource
865+
}
866+
867+
func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
868+
defer func() {
869+
close(s.cacheSyncDone)
870+
}()
871+
return s.SyncingSource.WaitForSync(ctx)
872+
}

0 commit comments

Comments
 (0)