Skip to content

Commit 3886222

Browse files
✨ Allow configuring cache sync timeouts
This PR allows users to configure timeout for cache syncs while starting the controller.
1 parent af24f3b commit 3886222

File tree

4 files changed

+43
-3
lines changed

4 files changed

+43
-3
lines changed

pkg/controller/controller.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
2223

2324
"github.com/go-logr/logr"
2425
"k8s.io/client-go/util/workqueue"
@@ -47,6 +48,10 @@ type Options struct {
4748
// Log is the logger used for this controller and passed to each reconciliation
4849
// request via the context field.
4950
Log logr.Logger
51+
52+
// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
53+
// Defaults to 10 seconds if not set.
54+
CacheSyncTimeout time.Duration
5055
}
5156

5257
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -112,6 +117,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
112117
options.Log = mgr.GetLogger()
113118
}
114119

120+
if options.CacheSyncTimeout == 0 {
121+
options.CacheSyncTimeout = 10 * time.Second
122+
}
123+
115124
// Inject dependencies into Reconciler
116125
if err := mgr.SetFields(options.Reconciler); err != nil {
117126
return nil, err
@@ -124,6 +133,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
124133
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
125134
},
126135
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
136+
CacheSyncTimeout: options.CacheSyncTimeout,
127137
SetFields: mgr.SetFields,
128138
Name: name,
129139
Log: options.Log.WithName("controller").WithName(name),

pkg/internal/controller/controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ type Controller struct {
7979
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
8080
ctx context.Context
8181

82+
// CacheSyncTimeout refers to the time limit set on waiting cache to sync
83+
// Defaults to 10 seconds if not set.
84+
CacheSyncTimeout time.Duration
85+
8286
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
8387
startWatches []watchDescription
8488

@@ -169,7 +173,10 @@ func (c *Controller) Start(ctx context.Context) error {
169173
if !ok {
170174
continue
171175
}
172-
if err := syncingSource.WaitForSync(ctx); err != nil {
176+
ct, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
177+
defer cancel()
178+
if err := syncingSource.WaitForSync(ct); err != nil {
179+
c.Log.Info("skipping it")
173180
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
174181
// Leaving it here because that could happen in the future
175182
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)

pkg/internal/controller/controller_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,29 @@ var _ = Describe("controller", func() {
122122
close(done)
123123
})
124124

125+
It("should wait for each informer to sync", func(done Done) {
126+
// TODO(directxman12): this test doesn't do what it says it does
127+
ctrl.CacheSyncTimeout = 1 * time.Nanosecond
128+
c, err := cache.New(cfg, cache.Options{})
129+
Expect(err).NotTo(HaveOccurred())
130+
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
131+
Expect(err).NotTo(HaveOccurred())
132+
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
133+
Expect(err).NotTo(HaveOccurred())
134+
ctrl.startWatches = []watchDescription{{
135+
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
136+
}, {
137+
src: source.NewKindWithCache(&appsv1.ReplicaSet{}, &informertest.FakeInformers{}),
138+
}}
139+
140+
// Use a cancelled context so Start doesn't block
141+
// ctx, cancel := context.WithCancel(context.Background())
142+
// cancel()
143+
Expect(ctrl.Start(context.Background())).To(HaveOccurred())
144+
145+
close(done)
146+
})
147+
125148
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
126149
pr1 := &predicate.Funcs{}
127150
pr2 := &predicate.Funcs{}

pkg/source/source_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ var _ = Describe("Source", func() {
211211
instance := source.Kind{}
212212
f := false
213213
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
214-
err := instance.WaitForSync(nil)
214+
err := instance.WaitForSync(context.Background())
215215
Expect(err).To(HaveOccurred())
216216
Expect(err.Error()).To(Equal("cache did not sync"))
217217

@@ -247,7 +247,7 @@ var _ = Describe("Source", func() {
247247
It("should return an error if syncing fails", func(done Done) {
248248
f := false
249249
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
250-
err := instance.WaitForSync(nil)
250+
err := instance.WaitForSync(context.Background())
251251
Expect(err).To(HaveOccurred())
252252
Expect(err.Error()).To(Equal("cache did not sync"))
253253

0 commit comments

Comments
 (0)