Skip to content

Commit c45adcf

Browse files
authored
Merge pull request #960 from vincepri/backport-054
Backports for v0.5.4
2 parents f31eaf7 + 55bed9b commit c45adcf

File tree

5 files changed

+82
-36
lines changed

5 files changed

+82
-36
lines changed

pkg/controller/controller.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
9191
// Create controller with dependencies set
9292
c := &controller.Controller{
9393
Do: options.Reconciler,
94-
Cache: mgr.GetCache(),
95-
Config: mgr.GetConfig(),
9694
Scheme: mgr.GetScheme(),
9795
Client: mgr.GetClient(),
9896
Recorder: mgr.GetEventRecorderFor(name),

pkg/internal/controller/controller.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ import (
2424
"k8s.io/apimachinery/pkg/runtime"
2525
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2626
"k8s.io/apimachinery/pkg/util/wait"
27-
"k8s.io/client-go/rest"
2827
"k8s.io/client-go/tools/record"
2928
"k8s.io/client-go/util/workqueue"
30-
"sigs.k8s.io/controller-runtime/pkg/cache"
3129
"sigs.k8s.io/controller-runtime/pkg/client"
3230
"sigs.k8s.io/controller-runtime/pkg/handler"
3331
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -61,13 +59,6 @@ type Controller struct {
6159
// Scheme is injected by the controllerManager when controllerManager.Start is called
6260
Scheme *runtime.Scheme
6361

64-
// informers are injected by the controllerManager when controllerManager.Start is called
65-
Cache cache.Cache
66-
67-
// Config is the rest.Config used to talk to the apiserver. Defaults to one of in-cluster, environment variable
68-
// specified, or the ~/.kube/Config.
69-
Config *rest.Config
70-
7162
// MakeQueue constructs the queue for this controller once the controller is ready to start.
7263
// This exists because the standard Kubernetes workqueues start themselves immediately, which
7364
// leads to goroutine leaks if something calls controller.New repeatedly.
@@ -86,10 +77,6 @@ type Controller struct {
8677
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
8778
JitterPeriod time.Duration
8879

89-
// WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
90-
// defaults to Cache.WaitForCacheSync
91-
WaitForCacheSync func(stopCh <-chan struct{}) bool
92-
9380
// Started is true if the Controller has been Started
9481
Started bool
9582

@@ -170,16 +157,18 @@ func (c *Controller) Start(stop <-chan struct{}) error {
170157
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
171158
log.Info("Starting Controller", "controller", c.Name)
172159

173-
// Wait for the caches to be synced before starting workers
174-
if c.WaitForCacheSync == nil {
175-
c.WaitForCacheSync = c.Cache.WaitForCacheSync
176-
}
177-
if ok := c.WaitForCacheSync(stop); !ok {
178-
// This code is unreachable right now since WaitForCacheSync will never return an error
179-
// Leaving it here because that could happen in the future
180-
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
181-
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
182-
return err
160+
for _, watch := range c.watches {
161+
syncingSource, ok := watch.src.(source.SyncingSource)
162+
if !ok {
163+
continue
164+
}
165+
if err := syncingSource.WaitForSync(stop); err != nil {
166+
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
167+
// Leaving it here because that could happen in the future
168+
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
169+
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
170+
return err
171+
}
183172
}
184173

185174
if c.JitterPeriod == 0 {

pkg/internal/controller/controller_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ var _ = Describe("controller", func() {
6565
MaxConcurrentReconciles: 1,
6666
Do: fakeReconcile,
6767
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
68-
Cache: informers,
6968
}
7069
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
7170
})
@@ -88,7 +87,10 @@ var _ = Describe("controller", func() {
8887

8988
Describe("Start", func() {
9089
It("should return an error if there is an error waiting for the informers", func(done Done) {
91-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return false }
90+
f := false
91+
ctrl.watches = []watchDescription{{
92+
src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}),
93+
}}
9294
ctrl.Name = "foo"
9395
err := ctrl.Start(stop)
9496
Expect(err).To(HaveOccurred())
@@ -110,8 +112,9 @@ var _ = Describe("controller", func() {
110112
Expect(err).NotTo(HaveOccurred())
111113
_, err = c.GetInformer(&appsv1.ReplicaSet{})
112114
Expect(err).NotTo(HaveOccurred())
113-
ctrl.Cache = c
114-
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }
115+
ctrl.watches = []watchDescription{{
116+
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
117+
}}
115118

116119
Expect(ctrl.Start(stopped)).NotTo(HaveOccurred())
117120

@@ -161,7 +164,7 @@ var _ = Describe("controller", func() {
161164
Describe("Watch", func() {
162165
It("should inject dependencies into the Source", func() {
163166
src := &source.Kind{Type: &corev1.Pod{}}
164-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
167+
Expect(src.InjectCache(informers)).To(Succeed())
165168
evthdl := &handler.EnqueueRequestForObject{}
166169
found := false
167170
ctrl.SetFields = func(i interface{}) error {
@@ -177,7 +180,7 @@ var _ = Describe("controller", func() {
177180

178181
It("should return an error if there is an error injecting into the Source", func() {
179182
src := &source.Kind{Type: &corev1.Pod{}}
180-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
183+
Expect(src.InjectCache(informers)).To(Succeed())
181184
evthdl := &handler.EnqueueRequestForObject{}
182185
expected := fmt.Errorf("expect fail source")
183186
ctrl.SetFields = func(i interface{}) error {
@@ -192,7 +195,7 @@ var _ = Describe("controller", func() {
192195

193196
It("should inject dependencies into the EventHandler", func() {
194197
src := &source.Kind{Type: &corev1.Pod{}}
195-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
198+
Expect(src.InjectCache(informers)).To(Succeed())
196199
evthdl := &handler.EnqueueRequestForObject{}
197200
found := false
198201
ctrl.SetFields = func(i interface{}) error {
@@ -230,7 +233,7 @@ var _ = Describe("controller", func() {
230233

231234
It("should inject dependencies into all of the Predicates", func() {
232235
src := &source.Kind{Type: &corev1.Pod{}}
233-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
236+
Expect(src.InjectCache(informers)).To(Succeed())
234237
evthdl := &handler.EnqueueRequestForObject{}
235238
pr1 := &predicate.Funcs{}
236239
pr2 := &predicate.Funcs{}
@@ -253,7 +256,7 @@ var _ = Describe("controller", func() {
253256

254257
It("should return an error if there is an error injecting into any of the Predicates", func() {
255258
src := &source.Kind{Type: &corev1.Pod{}}
256-
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
259+
Expect(src.InjectCache(informers)).To(Succeed())
257260
evthdl := &handler.EnqueueRequestForObject{}
258261
pr1 := &predicate.Funcs{}
259262
pr2 := &predicate.Funcs{}

pkg/source/source.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package source
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"sync"
2223

@@ -55,10 +56,17 @@ type Source interface {
5556
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
5657
}
5758

59+
// SyncingSource is a source that needs syncing prior to being usable. The controller
60+
// will call its WaitForSync prior to starting workers.
61+
type SyncingSource interface {
62+
Source
63+
WaitForSync(stop <-chan struct{}) error
64+
}
65+
5866
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
5967
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
6068
// from that other cluster
61-
func NewKindWithCache(object runtime.Object, cache cache.Cache) Source {
69+
func NewKindWithCache(object runtime.Object, cache cache.Cache) SyncingSource {
6270
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
6371
}
6472

@@ -71,6 +79,10 @@ func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.Rat
7179
return ks.kind.Start(handler, queue, prct...)
7280
}
7381

82+
func (ks *kindWithCache) WaitForSync(stop <-chan struct{}) error {
83+
return ks.kind.WaitForSync(stop)
84+
}
85+
7486
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
7587
type Kind struct {
7688
// Type is the type of object to watch. e.g. &v1.Pod{}
@@ -80,7 +92,7 @@ type Kind struct {
8092
cache cache.Cache
8193
}
8294

83-
var _ Source = &Kind{}
95+
var _ SyncingSource = &Kind{}
8496

8597
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
8698
// to enqueue reconcile.Requests.
@@ -117,6 +129,16 @@ func (ks *Kind) String() string {
117129
return fmt.Sprintf("kind source: unknown GVK")
118130
}
119131

132+
// WaitForSync implements SyncingSource to allow controllers to wait with starting
133+
// workers until the cache is synced.
134+
func (ks *Kind) WaitForSync(stop <-chan struct{}) error {
135+
if !ks.cache.WaitForCacheSync(stop) {
136+
// Would be great to return something more informative here
137+
return errors.New("cache did not sync")
138+
}
139+
return nil
140+
}
141+
120142
var _ inject.Cache = &Kind{}
121143

122144
// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
@@ -282,6 +304,8 @@ func (is *Informer) String() string {
282304
return fmt.Sprintf("informer source: %p", is.Informer)
283305
}
284306

307+
var _ Source = Func(nil)
308+
285309
// Func is a function that implements Source
286310
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
287311

pkg/source/source_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ var _ = Describe("Source", func() {
210210
close(done)
211211
})
212212

213+
It("should return an error if syncing fails", func(done Done) {
214+
instance := source.Kind{}
215+
f := false
216+
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
217+
err := instance.WaitForSync(nil)
218+
Expect(err).To(HaveOccurred())
219+
Expect(err.Error()).To(Equal("cache did not sync"))
220+
221+
close(done)
222+
223+
})
224+
213225
Context("for a Kind not in the cache", func() {
214226
It("should return an error when Start is called", func(done Done) {
215227
ic.Error = fmt.Errorf("test error")
@@ -227,6 +239,26 @@ var _ = Describe("Source", func() {
227239
})
228240
})
229241

242+
Describe("KindWithCache", func() {
243+
It("should not allow injecting a cache", func() {
244+
instance := source.NewKindWithCache(nil, nil)
245+
injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance)
246+
Expect(err).To(BeNil())
247+
Expect(injected).To(BeFalse())
248+
})
249+
250+
It("should return an error if syncing fails", func(done Done) {
251+
f := false
252+
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
253+
err := instance.WaitForSync(nil)
254+
Expect(err).To(HaveOccurred())
255+
Expect(err.Error()).To(Equal("cache did not sync"))
256+
257+
close(done)
258+
259+
})
260+
})
261+
230262
Describe("Func", func() {
231263
It("should be called from Start", func(done Done) {
232264
run := false

0 commit comments

Comments
 (0)