Skip to content

Commit 33c7bf8

Browse files
committed
✨ Controller: Let sources sync even if they use a different cluster
Currently, the controller gets a cache injected and waits for that cache to sync before starting workers. This model works well for single cluster usecases, but doesn't work at all for multi-cluster usecases, as the source may be using a completely different cache. This PR adds a new source.SyncingSource interface and makes the controller wait for all sources that implement it. Apart from solving the issue mentioned in the title, this also allows to implement custom sources that need syncing.
1 parent c2dfe1a commit 33c7bf8

File tree

5 files changed

+82
-30
lines changed

5 files changed

+82
-30
lines changed

pkg/controller/controller.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
103103
// Create controller with dependencies set
104104
c := &controller.Controller{
105105
Do: options.Reconciler,
106-
Cache: mgr.GetCache(),
107106
Scheme: mgr.GetScheme(),
108107
Client: mgr.GetClient(),
109108
Recorder: mgr.GetEventRecorderFor(name),

pkg/internal/controller/controller.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"k8s.io/apimachinery/pkg/util/wait"
2727
"k8s.io/client-go/tools/record"
2828
"k8s.io/client-go/util/workqueue"
29-
"sigs.k8s.io/controller-runtime/pkg/cache"
3029
"sigs.k8s.io/controller-runtime/pkg/client"
3130
"sigs.k8s.io/controller-runtime/pkg/handler"
3231
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -60,9 +59,6 @@ type Controller struct {
6059
// Scheme is injected by the controllerManager when controllerManager.Start is called
6160
Scheme *runtime.Scheme
6261

63-
// informers are injected by the controllerManager when controllerManager.Start is called
64-
Cache cache.Cache
65-
6662
// MakeQueue constructs the queue for this controller once the controller is ready to start.
6763
// This exists because the standard Kubernetes workqueues start themselves immediately, which
6864
// leads to goroutine leaks if something calls controller.New repeatedly.
@@ -81,10 +77,6 @@ type Controller struct {
8177
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
8278
JitterPeriod time.Duration
8379

84-
// WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
85-
// defaults to Cache.WaitForCacheSync
86-
WaitForCacheSync func(stopCh <-chan struct{}) bool
87-
8880
// Started is true if the Controller has been Started
8981
Started bool
9082

@@ -165,16 +157,18 @@ func (c *Controller) Start(stop <-chan struct{}) error {
165157
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
166158
log.Info("Starting Controller", "controller", c.Name)
167159

168-
// Wait for the caches to be synced before starting workers
169-
if c.WaitForCacheSync == nil {
170-
c.WaitForCacheSync = c.Cache.WaitForCacheSync
171-
}
172-
if ok := c.WaitForCacheSync(stop); !ok {
173-
// This code is unreachable right now since WaitForCacheSync will never return an error
174-
// Leaving it here because that could happen in the future
175-
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
176-
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
177-
return err
160+
for _, watch := range c.watches {
161+
syncingSource, ok := watch.src.(source.SyncingSource)
162+
if !ok {
163+
continue
164+
}
165+
if err := syncingSource.WaitForSync(stop); err != nil {
166+
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
167+
// Leaving it here because that could happen in the future
168+
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
169+
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
170+
return err
171+
}
178172
}
179173

180174
if c.JitterPeriod == 0 {

pkg/internal/controller/controller_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ var _ = Describe("controller", func() {
6666
MaxConcurrentReconciles: 1,
6767
Do: fakeReconcile,
6868
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
69-
Cache: informers,
7069
}
7170
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
7271
})
@@ -89,7 +88,10 @@ var _ = Describe("controller", func() {
8988

9089
Describe("Start", func() {
9190
It("should return an error if there is an error waiting for the informers", func(done Done) {
92-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return false }
91+
f := false
92+
ctrl.watches = []watchDescription{{
93+
src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}),
94+
}}
9395
ctrl.Name = "foo"
9496
err := ctrl.Start(stop)
9597
Expect(err).To(HaveOccurred())
@@ -111,8 +113,9 @@ var _ = Describe("controller", func() {
111113
Expect(err).NotTo(HaveOccurred())
112114
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
113115
Expect(err).NotTo(HaveOccurred())
114-
ctrl.Cache = c
115-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }
116+
ctrl.watches = []watchDescription{{
117+
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
118+
}}
116119

117120
Expect(ctrl.Start(stopped)).NotTo(HaveOccurred())
118121

@@ -162,7 +165,7 @@ var _ = Describe("controller", func() {
162165
Describe("Watch", func() {
163166
It("should inject dependencies into the Source", func() {
164167
src := &source.Kind{Type: &corev1.Pod{}}
165-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
168+
Expect(src.InjectCache(informers)).To(Succeed())
166169
evthdl := &handler.EnqueueRequestForObject{}
167170
found := false
168171
ctrl.SetFields = func(i interface{}) error {
@@ -178,7 +181,7 @@ var _ = Describe("controller", func() {
178181

179182
It("should return an error if there is an error injecting into the Source", func() {
180183
src := &source.Kind{Type: &corev1.Pod{}}
181-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
184+
Expect(src.InjectCache(informers)).To(Succeed())
182185
evthdl := &handler.EnqueueRequestForObject{}
183186
expected := fmt.Errorf("expect fail source")
184187
ctrl.SetFields = func(i interface{}) error {
@@ -193,7 +196,7 @@ var _ = Describe("controller", func() {
193196

194197
It("should inject dependencies into the EventHandler", func() {
195198
src := &source.Kind{Type: &corev1.Pod{}}
196-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
199+
Expect(src.InjectCache(informers)).To(Succeed())
197200
evthdl := &handler.EnqueueRequestForObject{}
198201
found := false
199202
ctrl.SetFields = func(i interface{}) error {
@@ -231,7 +234,7 @@ var _ = Describe("controller", func() {
231234

232235
It("should inject dependencies into all of the Predicates", func() {
233236
src := &source.Kind{Type: &corev1.Pod{}}
234-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
237+
Expect(src.InjectCache(informers)).To(Succeed())
235238
evthdl := &handler.EnqueueRequestForObject{}
236239
pr1 := &predicate.Funcs{}
237240
pr2 := &predicate.Funcs{}
@@ -254,7 +257,7 @@ var _ = Describe("controller", func() {
254257

255258
It("should return an error if there is an error injecting into any of the Predicates", func() {
256259
src := &source.Kind{Type: &corev1.Pod{}}
257-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
260+
Expect(src.InjectCache(informers)).To(Succeed())
258261
evthdl := &handler.EnqueueRequestForObject{}
259262
pr1 := &predicate.Funcs{}
260263
pr2 := &predicate.Funcs{}

pkg/source/source.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"sync"
2324

@@ -56,10 +57,17 @@ type Source interface {
5657
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
5758
}
5859

60+
// SyncingSource is a source that needs syncing prior to being usable. The controller
61+
// will call its WaitForSync prior to starting workers.
62+
type SyncingSource interface {
63+
Source
64+
WaitForSync(stop <-chan struct{}) error
65+
}
66+
5967
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
6068
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
6169
// from that other cluster
62-
func NewKindWithCache(object runtime.Object, cache cache.Cache) Source {
70+
func NewKindWithCache(object runtime.Object, cache cache.Cache) SyncingSource {
6371
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
6472
}
6573

@@ -72,6 +80,10 @@ func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.Rat
7280
return ks.kind.Start(handler, queue, prct...)
7381
}
7482

83+
func (ks *kindWithCache) WaitForSync(stop <-chan struct{}) error {
84+
return ks.kind.WaitForSync(stop)
85+
}
86+
7587
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
7688
type Kind struct {
7789
// Type is the type of object to watch. e.g. &v1.Pod{}
@@ -81,7 +93,7 @@ type Kind struct {
8193
cache cache.Cache
8294
}
8395

84-
var _ Source = &Kind{}
96+
var _ SyncingSource = &Kind{}
8597

8698
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
8799
// to enqueue reconcile.Requests.
@@ -118,6 +130,16 @@ func (ks *Kind) String() string {
118130
return fmt.Sprintf("kind source: unknown GVK")
119131
}
120132

133+
// WaitForSync implements SyncingSource to allow controllers to wait with starting
134+
// workers until the cache is synced.
135+
func (ks *Kind) WaitForSync(stop <-chan struct{}) error {
136+
if !ks.cache.WaitForCacheSync(stop) {
137+
// Would be great to return something more informative here
138+
return errors.New("cache did not sync")
139+
}
140+
return nil
141+
}
142+
121143
var _ inject.Cache = &Kind{}
122144

123145
// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
@@ -283,6 +305,8 @@ func (is *Informer) String() string {
283305
return fmt.Sprintf("informer source: %p", is.Informer)
284306
}
285307

308+
var _ Source = Func(nil)
309+
286310
// Func is a function that implements Source
287311
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
288312

pkg/source/source_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ var _ = Describe("Source", func() {
210210
close(done)
211211
})
212212

213+
It("should return an error if syncing fails", func(done Done) {
214+
instance := source.Kind{}
215+
f := false
216+
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
217+
err := instance.WaitForSync(nil)
218+
Expect(err).To(HaveOccurred())
219+
Expect(err.Error()).To(Equal("cache did not sync"))
220+
221+
close(done)
222+
223+
})
224+
213225
Context("for a Kind not in the cache", func() {
214226
It("should return an error when Start is called", func(done Done) {
215227
ic.Error = fmt.Errorf("test error")
@@ -227,6 +239,26 @@ var _ = Describe("Source", func() {
227239
})
228240
})
229241

242+
Describe("KindWithCache", func() {
243+
It("should not allow injecting a cache", func() {
244+
instance := source.NewKindWithCache(nil, nil)
245+
injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance)
246+
Expect(err).To(BeNil())
247+
Expect(injected).To(BeFalse())
248+
})
249+
250+
It("should return an error if syncing fails", func(done Done) {
251+
f := false
252+
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
253+
err := instance.WaitForSync(nil)
254+
Expect(err).To(HaveOccurred())
255+
Expect(err.Error()).To(Equal("cache did not sync"))
256+
257+
close(done)
258+
259+
})
260+
})
261+
230262
Describe("Func", func() {
231263
It("should be called from Start", func(done Done) {
232264
run := false

0 commit comments

Comments
 (0)