Skip to content

Commit 55bed9b

Browse files
alvaroalemanvincepri
authored andcommitted
✨ Controller: Let sources sync even if they use a different cluster
Currently, the controller gets a cache injected and waits for that cache to sync before starting workers. This model works well for single cluster usecases, but doesn't work at all for multi-cluster usecases, as the source may be using a completely different cache. This PR adds a new source.SyncingSource interface and makes the controller wait for all sources that implement it. Apart from solving the issue mentioned in the title, this also allows to implement custom sources that need syncing.
1 parent f31eaf7 commit 55bed9b

File tree

5 files changed

+82
-36
lines changed

5 files changed

+82
-36
lines changed

pkg/controller/controller.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
9191
// Create controller with dependencies set
9292
c := &controller.Controller{
9393
Do: options.Reconciler,
94-
Cache: mgr.GetCache(),
95-
Config: mgr.GetConfig(),
9694
Scheme: mgr.GetScheme(),
9795
Client: mgr.GetClient(),
9896
Recorder: mgr.GetEventRecorderFor(name),

pkg/internal/controller/controller.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ import (
2424
"k8s.io/apimachinery/pkg/runtime"
2525
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2626
"k8s.io/apimachinery/pkg/util/wait"
27-
"k8s.io/client-go/rest"
2827
"k8s.io/client-go/tools/record"
2928
"k8s.io/client-go/util/workqueue"
30-
"sigs.k8s.io/controller-runtime/pkg/cache"
3129
"sigs.k8s.io/controller-runtime/pkg/client"
3230
"sigs.k8s.io/controller-runtime/pkg/handler"
3331
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -61,13 +59,6 @@ type Controller struct {
6159
// Scheme is injected by the controllerManager when controllerManager.Start is called
6260
Scheme *runtime.Scheme
6361

64-
// informers are injected by the controllerManager when controllerManager.Start is called
65-
Cache cache.Cache
66-
67-
// Config is the rest.Config used to talk to the apiserver. Defaults to one of in-cluster, environment variable
68-
// specified, or the ~/.kube/Config.
69-
Config *rest.Config
70-
7162
// MakeQueue constructs the queue for this controller once the controller is ready to start.
7263
// This exists because the standard Kubernetes workqueues start themselves immediately, which
7364
// leads to goroutine leaks if something calls controller.New repeatedly.
@@ -86,10 +77,6 @@ type Controller struct {
8677
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
8778
JitterPeriod time.Duration
8879

89-
// WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
90-
// defaults to Cache.WaitForCacheSync
91-
WaitForCacheSync func(stopCh <-chan struct{}) bool
92-
9380
// Started is true if the Controller has been Started
9481
Started bool
9582

@@ -170,16 +157,18 @@ func (c *Controller) Start(stop <-chan struct{}) error {
170157
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
171158
log.Info("Starting Controller", "controller", c.Name)
172159

173-
// Wait for the caches to be synced before starting workers
174-
if c.WaitForCacheSync == nil {
175-
c.WaitForCacheSync = c.Cache.WaitForCacheSync
176-
}
177-
if ok := c.WaitForCacheSync(stop); !ok {
178-
// This code is unreachable right now since WaitForCacheSync will never return an error
179-
// Leaving it here because that could happen in the future
180-
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
181-
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
182-
return err
160+
for _, watch := range c.watches {
161+
syncingSource, ok := watch.src.(source.SyncingSource)
162+
if !ok {
163+
continue
164+
}
165+
if err := syncingSource.WaitForSync(stop); err != nil {
166+
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
167+
// Leaving it here because that could happen in the future
168+
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
169+
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
170+
return err
171+
}
183172
}
184173

185174
if c.JitterPeriod == 0 {

pkg/internal/controller/controller_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ var _ = Describe("controller", func() {
6565
MaxConcurrentReconciles: 1,
6666
Do: fakeReconcile,
6767
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
68-
Cache: informers,
6968
}
7069
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
7170
})
@@ -88,7 +87,10 @@ var _ = Describe("controller", func() {
8887

8988
Describe("Start", func() {
9089
It("should return an error if there is an error waiting for the informers", func(done Done) {
91-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return false }
90+
f := false
91+
ctrl.watches = []watchDescription{{
92+
src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}),
93+
}}
9294
ctrl.Name = "foo"
9395
err := ctrl.Start(stop)
9496
Expect(err).To(HaveOccurred())
@@ -110,8 +112,9 @@ var _ = Describe("controller", func() {
110112
Expect(err).NotTo(HaveOccurred())
111113
_, err = c.GetInformer(&appsv1.ReplicaSet{})
112114
Expect(err).NotTo(HaveOccurred())
113-
ctrl.Cache = c
114-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }
115+
ctrl.watches = []watchDescription{{
116+
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
117+
}}
115118

116119
Expect(ctrl.Start(stopped)).NotTo(HaveOccurred())
117120

@@ -161,7 +164,7 @@ var _ = Describe("controller", func() {
161164
Describe("Watch", func() {
162165
It("should inject dependencies into the Source", func() {
163166
src := &source.Kind{Type: &corev1.Pod{}}
164-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
167+
Expect(src.InjectCache(informers)).To(Succeed())
165168
evthdl := &handler.EnqueueRequestForObject{}
166169
found := false
167170
ctrl.SetFields = func(i interface{}) error {
@@ -177,7 +180,7 @@ var _ = Describe("controller", func() {
177180

178181
It("should return an error if there is an error injecting into the Source", func() {
179182
src := &source.Kind{Type: &corev1.Pod{}}
180-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
183+
Expect(src.InjectCache(informers)).To(Succeed())
181184
evthdl := &handler.EnqueueRequestForObject{}
182185
expected := fmt.Errorf("expect fail source")
183186
ctrl.SetFields = func(i interface{}) error {
@@ -192,7 +195,7 @@ var _ = Describe("controller", func() {
192195

193196
It("should inject dependencies into the EventHandler", func() {
194197
src := &source.Kind{Type: &corev1.Pod{}}
195-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
198+
Expect(src.InjectCache(informers)).To(Succeed())
196199
evthdl := &handler.EnqueueRequestForObject{}
197200
found := false
198201
ctrl.SetFields = func(i interface{}) error {
@@ -230,7 +233,7 @@ var _ = Describe("controller", func() {
230233

231234
It("should inject dependencies into all of the Predicates", func() {
232235
src := &source.Kind{Type: &corev1.Pod{}}
233-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
236+
Expect(src.InjectCache(informers)).To(Succeed())
234237
evthdl := &handler.EnqueueRequestForObject{}
235238
pr1 := &predicate.Funcs{}
236239
pr2 := &predicate.Funcs{}
@@ -253,7 +256,7 @@ var _ = Describe("controller", func() {
253256

254257
It("should return an error if there is an error injecting into any of the Predicates", func() {
255258
src := &source.Kind{Type: &corev1.Pod{}}
256-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
259+
Expect(src.InjectCache(informers)).To(Succeed())
257260
evthdl := &handler.EnqueueRequestForObject{}
258261
pr1 := &predicate.Funcs{}
259262
pr2 := &predicate.Funcs{}

pkg/source/source.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package source
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"sync"
2223

@@ -55,10 +56,17 @@ type Source interface {
5556
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
5657
}
5758

59+
// SyncingSource is a source that needs syncing prior to being usable. The controller
60+
// will call its WaitForSync prior to starting workers.
61+
type SyncingSource interface {
62+
Source
63+
WaitForSync(stop <-chan struct{}) error
64+
}
65+
5866
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
5967
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
6068
// from that other cluster
61-
func NewKindWithCache(object runtime.Object, cache cache.Cache) Source {
69+
func NewKindWithCache(object runtime.Object, cache cache.Cache) SyncingSource {
6270
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
6371
}
6472

@@ -71,6 +79,10 @@ func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.Rat
7179
return ks.kind.Start(handler, queue, prct...)
7280
}
7381

82+
func (ks *kindWithCache) WaitForSync(stop <-chan struct{}) error {
83+
return ks.kind.WaitForSync(stop)
84+
}
85+
7486
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
7587
type Kind struct {
7688
// Type is the type of object to watch. e.g. &v1.Pod{}
@@ -80,7 +92,7 @@ type Kind struct {
8092
cache cache.Cache
8193
}
8294

83-
var _ Source = &Kind{}
95+
var _ SyncingSource = &Kind{}
8496

8597
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
8698
// to enqueue reconcile.Requests.
@@ -117,6 +129,16 @@ func (ks *Kind) String() string {
117129
return fmt.Sprintf("kind source: unknown GVK")
118130
}
119131

132+
// WaitForSync implements SyncingSource to allow controllers to wait with starting
133+
// workers until the cache is synced.
134+
func (ks *Kind) WaitForSync(stop <-chan struct{}) error {
135+
if !ks.cache.WaitForCacheSync(stop) {
136+
// Would be great to return something more informative here
137+
return errors.New("cache did not sync")
138+
}
139+
return nil
140+
}
141+
120142
var _ inject.Cache = &Kind{}
121143

122144
// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
@@ -282,6 +304,8 @@ func (is *Informer) String() string {
282304
return fmt.Sprintf("informer source: %p", is.Informer)
283305
}
284306

307+
var _ Source = Func(nil)
308+
285309
// Func is a function that implements Source
286310
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
287311

pkg/source/source_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ var _ = Describe("Source", func() {
210210
close(done)
211211
})
212212

213+
It("should return an error if syncing fails", func(done Done) {
214+
instance := source.Kind{}
215+
f := false
216+
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
217+
err := instance.WaitForSync(nil)
218+
Expect(err).To(HaveOccurred())
219+
Expect(err.Error()).To(Equal("cache did not sync"))
220+
221+
close(done)
222+
223+
})
224+
213225
Context("for a Kind not in the cache", func() {
214226
It("should return an error when Start is called", func(done Done) {
215227
ic.Error = fmt.Errorf("test error")
@@ -227,6 +239,26 @@ var _ = Describe("Source", func() {
227239
})
228240
})
229241

242+
Describe("KindWithCache", func() {
243+
It("should not allow injecting a cache", func() {
244+
instance := source.NewKindWithCache(nil, nil)
245+
injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance)
246+
Expect(err).To(BeNil())
247+
Expect(injected).To(BeFalse())
248+
})
249+
250+
It("should return an error if syncing fails", func(done Done) {
251+
f := false
252+
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
253+
err := instance.WaitForSync(nil)
254+
Expect(err).To(HaveOccurred())
255+
Expect(err.Error()).To(Equal("cache did not sync"))
256+
257+
close(done)
258+
259+
})
260+
})
261+
230262
Describe("Func", func() {
231263
It("should be called from Start", func(done Done) {
232264
run := false

0 commit comments

Comments
 (0)