Skip to content

Commit ca25c1f

Browse files
authored
Merge pull request #941 from alvaroaleman/syncing-source
✨ Controller: Let sources sync even if they use a different cluster
2 parents c2dfe1a + 33c7bf8 commit ca25c1f

File tree

5 files changed

+82
-30
lines changed

5 files changed

+82
-30
lines changed

pkg/controller/controller.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
103103
// Create controller with dependencies set
104104
c := &controller.Controller{
105105
Do: options.Reconciler,
106-
Cache: mgr.GetCache(),
107106
Scheme: mgr.GetScheme(),
108107
Client: mgr.GetClient(),
109108
Recorder: mgr.GetEventRecorderFor(name),

pkg/internal/controller/controller.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"k8s.io/apimachinery/pkg/util/wait"
2727
"k8s.io/client-go/tools/record"
2828
"k8s.io/client-go/util/workqueue"
29-
"sigs.k8s.io/controller-runtime/pkg/cache"
3029
"sigs.k8s.io/controller-runtime/pkg/client"
3130
"sigs.k8s.io/controller-runtime/pkg/handler"
3231
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -60,9 +59,6 @@ type Controller struct {
6059
// Scheme is injected by the controllerManager when controllerManager.Start is called
6160
Scheme *runtime.Scheme
6261

63-
// informers are injected by the controllerManager when controllerManager.Start is called
64-
Cache cache.Cache
65-
6662
// MakeQueue constructs the queue for this controller once the controller is ready to start.
6763
// This exists because the standard Kubernetes workqueues start themselves immediately, which
6864
// leads to goroutine leaks if something calls controller.New repeatedly.
@@ -81,10 +77,6 @@ type Controller struct {
8177
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
8278
JitterPeriod time.Duration
8379

84-
// WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
85-
// defaults to Cache.WaitForCacheSync
86-
WaitForCacheSync func(stopCh <-chan struct{}) bool
87-
8880
// Started is true if the Controller has been Started
8981
Started bool
9082

@@ -165,16 +157,18 @@ func (c *Controller) Start(stop <-chan struct{}) error {
165157
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
166158
log.Info("Starting Controller", "controller", c.Name)
167159

168-
// Wait for the caches to be synced before starting workers
169-
if c.WaitForCacheSync == nil {
170-
c.WaitForCacheSync = c.Cache.WaitForCacheSync
171-
}
172-
if ok := c.WaitForCacheSync(stop); !ok {
173-
// This code is unreachable right now since WaitForCacheSync will never return an error
174-
// Leaving it here because that could happen in the future
175-
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
176-
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
177-
return err
160+
for _, watch := range c.watches {
161+
syncingSource, ok := watch.src.(source.SyncingSource)
162+
if !ok {
163+
continue
164+
}
165+
if err := syncingSource.WaitForSync(stop); err != nil {
166+
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
167+
// Leaving it here because that could happen in the future
168+
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
169+
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
170+
return err
171+
}
178172
}
179173

180174
if c.JitterPeriod == 0 {

pkg/internal/controller/controller_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ var _ = Describe("controller", func() {
6666
MaxConcurrentReconciles: 1,
6767
Do: fakeReconcile,
6868
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
69-
Cache: informers,
7069
}
7170
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
7271
})
@@ -89,7 +88,10 @@ var _ = Describe("controller", func() {
8988

9089
Describe("Start", func() {
9190
It("should return an error if there is an error waiting for the informers", func(done Done) {
92-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return false }
91+
f := false
92+
ctrl.watches = []watchDescription{{
93+
src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}),
94+
}}
9395
ctrl.Name = "foo"
9496
err := ctrl.Start(stop)
9597
Expect(err).To(HaveOccurred())
@@ -111,8 +113,9 @@ var _ = Describe("controller", func() {
111113
Expect(err).NotTo(HaveOccurred())
112114
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
113115
Expect(err).NotTo(HaveOccurred())
114-
ctrl.Cache = c
115-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }
116+
ctrl.watches = []watchDescription{{
117+
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
118+
}}
116119

117120
Expect(ctrl.Start(stopped)).NotTo(HaveOccurred())
118121

@@ -162,7 +165,7 @@ var _ = Describe("controller", func() {
162165
Describe("Watch", func() {
163166
It("should inject dependencies into the Source", func() {
164167
src := &source.Kind{Type: &corev1.Pod{}}
165-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
168+
Expect(src.InjectCache(informers)).To(Succeed())
166169
evthdl := &handler.EnqueueRequestForObject{}
167170
found := false
168171
ctrl.SetFields = func(i interface{}) error {
@@ -178,7 +181,7 @@ var _ = Describe("controller", func() {
178181

179182
It("should return an error if there is an error injecting into the Source", func() {
180183
src := &source.Kind{Type: &corev1.Pod{}}
181-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
184+
Expect(src.InjectCache(informers)).To(Succeed())
182185
evthdl := &handler.EnqueueRequestForObject{}
183186
expected := fmt.Errorf("expect fail source")
184187
ctrl.SetFields = func(i interface{}) error {
@@ -193,7 +196,7 @@ var _ = Describe("controller", func() {
193196

194197
It("should inject dependencies into the EventHandler", func() {
195198
src := &source.Kind{Type: &corev1.Pod{}}
196-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
199+
Expect(src.InjectCache(informers)).To(Succeed())
197200
evthdl := &handler.EnqueueRequestForObject{}
198201
found := false
199202
ctrl.SetFields = func(i interface{}) error {
@@ -231,7 +234,7 @@ var _ = Describe("controller", func() {
231234

232235
It("should inject dependencies into all of the Predicates", func() {
233236
src := &source.Kind{Type: &corev1.Pod{}}
234-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
237+
Expect(src.InjectCache(informers)).To(Succeed())
235238
evthdl := &handler.EnqueueRequestForObject{}
236239
pr1 := &predicate.Funcs{}
237240
pr2 := &predicate.Funcs{}
@@ -254,7 +257,7 @@ var _ = Describe("controller", func() {
254257

255258
It("should return an error if there is an error injecting into any of the Predicates", func() {
256259
src := &source.Kind{Type: &corev1.Pod{}}
257-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
260+
Expect(src.InjectCache(informers)).To(Succeed())
258261
evthdl := &handler.EnqueueRequestForObject{}
259262
pr1 := &predicate.Funcs{}
260263
pr2 := &predicate.Funcs{}

pkg/source/source.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"sync"
2324

@@ -56,10 +57,17 @@ type Source interface {
5657
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
5758
}
5859

60+
// SyncingSource is a source that needs syncing prior to being usable. The controller
61+
// will call its WaitForSync prior to starting workers.
62+
type SyncingSource interface {
63+
Source
64+
WaitForSync(stop <-chan struct{}) error
65+
}
66+
5967
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
6068
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
6169
// from that other cluster
62-
func NewKindWithCache(object runtime.Object, cache cache.Cache) Source {
70+
func NewKindWithCache(object runtime.Object, cache cache.Cache) SyncingSource {
6371
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
6472
}
6573

@@ -72,6 +80,10 @@ func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.Rat
7280
return ks.kind.Start(handler, queue, prct...)
7381
}
7482

83+
func (ks *kindWithCache) WaitForSync(stop <-chan struct{}) error {
84+
return ks.kind.WaitForSync(stop)
85+
}
86+
7587
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
7688
type Kind struct {
7789
// Type is the type of object to watch. e.g. &v1.Pod{}
@@ -81,7 +93,7 @@ type Kind struct {
8193
cache cache.Cache
8294
}
8395

84-
var _ Source = &Kind{}
96+
var _ SyncingSource = &Kind{}
8597

8698
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
8799
// to enqueue reconcile.Requests.
@@ -118,6 +130,16 @@ func (ks *Kind) String() string {
118130
return fmt.Sprintf("kind source: unknown GVK")
119131
}
120132

133+
// WaitForSync implements SyncingSource to allow controllers to wait with starting
134+
// workers until the cache is synced.
135+
func (ks *Kind) WaitForSync(stop <-chan struct{}) error {
136+
if !ks.cache.WaitForCacheSync(stop) {
137+
// Would be great to return something more informative here
138+
return errors.New("cache did not sync")
139+
}
140+
return nil
141+
}
142+
121143
var _ inject.Cache = &Kind{}
122144

123145
// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
@@ -283,6 +305,8 @@ func (is *Informer) String() string {
283305
return fmt.Sprintf("informer source: %p", is.Informer)
284306
}
285307

308+
var _ Source = Func(nil)
309+
286310
// Func is a function that implements Source
287311
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
288312

pkg/source/source_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ var _ = Describe("Source", func() {
210210
close(done)
211211
})
212212

213+
It("should return an error if syncing fails", func(done Done) {
214+
instance := source.Kind{}
215+
f := false
216+
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
217+
err := instance.WaitForSync(nil)
218+
Expect(err).To(HaveOccurred())
219+
Expect(err.Error()).To(Equal("cache did not sync"))
220+
221+
close(done)
222+
223+
})
224+
213225
Context("for a Kind not in the cache", func() {
214226
It("should return an error when Start is called", func(done Done) {
215227
ic.Error = fmt.Errorf("test error")
@@ -227,6 +239,26 @@ var _ = Describe("Source", func() {
227239
})
228240
})
229241

242+
Describe("KindWithCache", func() {
243+
It("should not allow injecting a cache", func() {
244+
instance := source.NewKindWithCache(nil, nil)
245+
injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance)
246+
Expect(err).To(BeNil())
247+
Expect(injected).To(BeFalse())
248+
})
249+
250+
It("should return an error if syncing fails", func(done Done) {
251+
f := false
252+
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
253+
err := instance.WaitForSync(nil)
254+
Expect(err).To(HaveOccurred())
255+
Expect(err.Error()).To(Equal("cache did not sync"))
256+
257+
close(done)
258+
259+
})
260+
})
261+
230262
Describe("Func", func() {
231263
It("should be called from Start", func(done Done) {
232264
run := false

0 commit comments

Comments
 (0)