Skip to content

Commit c9b52c6

Browse files
committed
✨ Manager: Start all caches before other Runnables
This pr implements the second half of the ["Move cluster-specific code out of the manager" proposal][0]: Making it start all caches before other Runnables and not just the one of the main Cluster. [0]: https://github.com/kubernetes-sigs/controller-runtime/blob/66537ca5b7439b06f2f3b08901640f934834c9a1/designs/move-cluster-specific-code-out-of-manager.md#L131-L132
1 parent 66537ca commit c9b52c6

File tree

3 files changed

+133
-11
lines changed

3 files changed

+133
-11
lines changed

pkg/cache/informertest/fake_cache.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package informertest
1818

1919
import (
2020
"context"
21+
"sync"
2122

2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -36,6 +37,17 @@ type FakeInformers struct {
3637
Scheme *runtime.Scheme
3738
Error error
3839
Synced *bool
40+
startedLock sync.Mutex
41+
started bool
42+
// WaitForCacheSyncCalled gets set to true after both Start() and WaitForCacheSync(
43+
// were called.
44+
WaitForCacheSyncCalled bool
45+
}
46+
47+
func (c *FakeInformers) Started() bool {
48+
c.startedLock.Lock()
49+
defer c.startedLock.Unlock()
50+
return c.started
3951
}
4052

4153
// GetInformerForKind implements Informers
@@ -81,6 +93,12 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac
8193

8294
// WaitForCacheSync implements Informers
8395
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
96+
defer func() {
97+
for !c.Started() {
98+
continue
99+
}
100+
c.WaitForCacheSyncCalled = true
101+
}()
84102
if c.Synced == nil {
85103
return true
86104
}
@@ -122,6 +140,11 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
122140

123141
// Start implements Informers
124142
func (c *FakeInformers) Start(ctx context.Context) error {
143+
defer func() {
144+
c.startedLock.Lock()
145+
defer c.startedLock.Unlock()
146+
c.started = true
147+
}()
125148
return c.Error
126149
}
127150

pkg/manager/internal.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type controllerManager struct {
125125
// election was configured.
126126
elected chan struct{}
127127

128-
startCache func(ctx context.Context) error
128+
caches []hasCache
129129

130130
// port is the port that the webhook server serves at.
131131
port int
@@ -173,6 +173,11 @@ type controllerManager struct {
173173
internalProceduresStop chan struct{}
174174
}
175175

176+
type hasCache interface {
177+
Runnable
178+
GetCache() cache.Cache
179+
}
180+
176181
// Add sets dependencies on i, and adds it to the list of Runnables to start.
177182
func (cm *controllerManager) Add(r Runnable) error {
178183
cm.mu.Lock()
@@ -192,6 +197,8 @@ func (cm *controllerManager) Add(r Runnable) error {
192197
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
193198
shouldStart = cm.started
194199
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
200+
} else if hasCache, ok := r.(hasCache); ok {
201+
cm.caches = append(cm.caches, hasCache)
195202
} else {
196203
shouldStart = cm.startedLeader
197204
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
@@ -423,6 +430,9 @@ func (cm *controllerManager) serveHealthProbes() {
423430
}
424431

425432
func (cm *controllerManager) Start(ctx context.Context) (err error) {
433+
if err := cm.Add(cm.cluster); err != nil {
434+
return fmt.Errorf("failed to add cluster to runnables: %w", err)
435+
}
426436
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
427437

428438
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
@@ -590,17 +600,15 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
590600
return
591601
}
592602

593-
// Start the Cache. Allow the function to start the cache to be mocked out for testing
594-
if cm.startCache == nil {
595-
cm.startCache = cm.cluster.Start
603+
for _, cache := range cm.caches {
604+
cm.startRunnable(cache)
596605
}
597-
cm.startRunnable(RunnableFunc(func(ctx context.Context) error {
598-
return cm.startCache(ctx)
599-
}))
600606

601607
// Wait for the caches to sync.
602608
// TODO(community): Check the return value and write a test
603-
cm.cluster.GetCache().WaitForCacheSync(ctx)
609+
for _, cache := range cm.caches {
610+
cache.GetCache().WaitForCacheSync(ctx)
611+
}
604612
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
605613
// cm.started as check if we already started the cache so it must always become true.
606614
// Making sure that the cache doesn't get started twice is needed to not get a "close

pkg/manager/manager_test.go

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"sigs.k8s.io/controller-runtime/pkg/cache"
4444
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
4545
"sigs.k8s.io/controller-runtime/pkg/client"
46+
"sigs.k8s.io/controller-runtime/pkg/cluster"
4647
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
4748
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
4849
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
@@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() {
612613
}
613614
mgr, ok := m.(*controllerManager)
614615
Expect(ok).To(BeTrue())
615-
mgr.startCache = func(context.Context) error {
616-
return fmt.Errorf("expected error")
617-
}
616+
mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}}
618617

619618
ctx, cancel := context.WithCancel(context.Background())
620619
defer cancel()
@@ -623,6 +622,84 @@ var _ = Describe("manger.Manager", func() {
623622
close(done)
624623
})
625624

625+
It("should start the cache before starting anything else", func(done Done) {
626+
fakeCache := &informertest.FakeInformers{}
627+
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
628+
return fakeCache, nil
629+
}
630+
m, err := New(cfg, options)
631+
Expect(err).NotTo(HaveOccurred())
632+
for _, cb := range callbacks {
633+
cb(m)
634+
}
635+
636+
runnableWasStarted := make(chan struct{})
637+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
638+
defer GinkgoRecover()
639+
if !fakeCache.WaitForCacheSyncCalled {
640+
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
641+
}
642+
643+
close(runnableWasStarted)
644+
return nil
645+
}))).To(Succeed())
646+
647+
ctx, cancel := context.WithCancel(context.Background())
648+
defer cancel()
649+
go func() {
650+
defer GinkgoRecover()
651+
Expect(m.Start(ctx)).ToNot(HaveOccurred())
652+
}()
653+
654+
<-runnableWasStarted
655+
close(done)
656+
})
657+
658+
It("should start additional clusters before anything else", func(done Done) {
659+
fakeCache := &informertest.FakeInformers{}
660+
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
661+
return fakeCache, nil
662+
}
663+
m, err := New(cfg, options)
664+
Expect(err).NotTo(HaveOccurred())
665+
for _, cb := range callbacks {
666+
cb(m)
667+
}
668+
669+
additionalClusterCache := &informertest.FakeInformers{}
670+
additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
671+
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
672+
return additionalClusterCache, nil
673+
}
674+
})
675+
Expect(err).NotTo(HaveOccurred())
676+
Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())
677+
678+
runnableWasStarted := make(chan struct{})
679+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
680+
defer GinkgoRecover()
681+
if !fakeCache.WaitForCacheSyncCalled {
682+
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
683+
}
684+
if !additionalClusterCache.WaitForCacheSyncCalled {
685+
return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
686+
}
687+
688+
close(runnableWasStarted)
689+
return nil
690+
}))).To(Succeed())
691+
692+
ctx, cancel := context.WithCancel(context.Background())
693+
defer cancel()
694+
go func() {
695+
defer GinkgoRecover()
696+
Expect(m.Start(ctx)).ToNot(HaveOccurred())
697+
}()
698+
699+
<-runnableWasStarted
700+
close(done)
701+
})
702+
626703
It("should return an error if any Components fail to Start", func(done Done) {
627704
m, err := New(cfg, options)
628705
Expect(err).NotTo(HaveOccurred())
@@ -1625,3 +1702,17 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration
16251702
func (f *fakeDeferredLoader) InjectScheme(scheme *runtime.Scheme) error {
16261703
return nil
16271704
}
1705+
1706+
var _ Runnable = &cacheProvider{}
1707+
1708+
type cacheProvider struct {
1709+
cache cache.Cache
1710+
}
1711+
1712+
func (c *cacheProvider) GetCache() cache.Cache {
1713+
return c.cache
1714+
}
1715+
1716+
func (c *cacheProvider) Start(ctx context.Context) error {
1717+
return c.cache.Start(ctx)
1718+
}

0 commit comments

Comments
 (0)