Skip to content

Commit ad68b43

Browse files
authored
Merge pull request #1327 from alvaroaleman/start-clusters
✨ Manager: Start all caches before other Runnables
2 parents 73c52e8 + 70df377 commit ad68b43

File tree

2 files changed

+143
-11
lines changed

2 files changed

+143
-11
lines changed

pkg/manager/internal.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type controllerManager struct {
125125
// election was configured.
126126
elected chan struct{}
127127

128-
startCache func(ctx context.Context) error
128+
caches []hasCache
129129

130130
// port is the port that the webhook server serves at.
131131
port int
@@ -173,6 +173,11 @@ type controllerManager struct {
173173
internalProceduresStop chan struct{}
174174
}
175175

176+
type hasCache interface {
177+
Runnable
178+
GetCache() cache.Cache
179+
}
180+
176181
// Add sets dependencies on i, and adds it to the list of Runnables to start.
177182
func (cm *controllerManager) Add(r Runnable) error {
178183
cm.mu.Lock()
@@ -192,6 +197,8 @@ func (cm *controllerManager) Add(r Runnable) error {
192197
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
193198
shouldStart = cm.started
194199
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
200+
} else if hasCache, ok := r.(hasCache); ok {
201+
cm.caches = append(cm.caches, hasCache)
195202
} else {
196203
shouldStart = cm.startedLeader
197204
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
@@ -423,6 +430,9 @@ func (cm *controllerManager) serveHealthProbes() {
423430
}
424431

425432
func (cm *controllerManager) Start(ctx context.Context) (err error) {
433+
if err := cm.Add(cm.cluster); err != nil {
434+
return fmt.Errorf("failed to add cluster to runnables: %w", err)
435+
}
426436
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
427437

428438
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
@@ -590,17 +600,15 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
590600
return
591601
}
592602

593-
// Start the Cache. Allow the function to start the cache to be mocked out for testing
594-
if cm.startCache == nil {
595-
cm.startCache = cm.cluster.Start
603+
for _, cache := range cm.caches {
604+
cm.startRunnable(cache)
596605
}
597-
cm.startRunnable(RunnableFunc(func(ctx context.Context) error {
598-
return cm.startCache(ctx)
599-
}))
600606

601607
// Wait for the caches to sync.
602608
// TODO(community): Check the return value and write a test
603-
cm.cluster.GetCache().WaitForCacheSync(ctx)
609+
for _, cache := range cm.caches {
610+
cache.GetCache().WaitForCacheSync(ctx)
611+
}
604612
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
605613
// cm.started as check if we already started the cache so it must always become true.
606614
// Making sure that the cache doesn't get started twice is needed to not get a "close

pkg/manager/manager_test.go

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"sigs.k8s.io/controller-runtime/pkg/cache"
4444
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
4545
"sigs.k8s.io/controller-runtime/pkg/client"
46+
"sigs.k8s.io/controller-runtime/pkg/cluster"
4647
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
4748
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
4849
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
@@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() {
612613
}
613614
mgr, ok := m.(*controllerManager)
614615
Expect(ok).To(BeTrue())
615-
mgr.startCache = func(context.Context) error {
616-
return fmt.Errorf("expected error")
617-
}
616+
mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}}
618617

619618
ctx, cancel := context.WithCancel(context.Background())
620619
defer cancel()
@@ -623,6 +622,82 @@ var _ = Describe("manger.Manager", func() {
623622
close(done)
624623
})
625624

625+
It("should start the cache before starting anything else", func(done Done) {
626+
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
627+
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
628+
return fakeCache, nil
629+
}
630+
m, err := New(cfg, options)
631+
Expect(err).NotTo(HaveOccurred())
632+
for _, cb := range callbacks {
633+
cb(m)
634+
}
635+
636+
runnableWasStarted := make(chan struct{})
637+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
638+
defer GinkgoRecover()
639+
if !fakeCache.wasSynced {
640+
return errors.New("runnable got started before cache was synced")
641+
}
642+
close(runnableWasStarted)
643+
return nil
644+
}))).To(Succeed())
645+
646+
ctx, cancel := context.WithCancel(context.Background())
647+
defer cancel()
648+
go func() {
649+
defer GinkgoRecover()
650+
Expect(m.Start(ctx)).ToNot(HaveOccurred())
651+
}()
652+
653+
<-runnableWasStarted
654+
close(done)
655+
})
656+
657+
It("should start additional clusters before anything else", func(done Done) {
658+
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
659+
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
660+
return fakeCache, nil
661+
}
662+
m, err := New(cfg, options)
663+
Expect(err).NotTo(HaveOccurred())
664+
for _, cb := range callbacks {
665+
cb(m)
666+
}
667+
668+
additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
669+
additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
670+
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
671+
return additionalClusterCache, nil
672+
}
673+
})
674+
Expect(err).NotTo(HaveOccurred())
675+
Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())
676+
677+
runnableWasStarted := make(chan struct{})
678+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
679+
defer GinkgoRecover()
680+
if !fakeCache.wasSynced {
681+
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
682+
}
683+
if !additionalClusterCache.wasSynced {
684+
return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
685+
}
686+
close(runnableWasStarted)
687+
return nil
688+
}))).To(Succeed())
689+
690+
ctx, cancel := context.WithCancel(context.Background())
691+
defer cancel()
692+
go func() {
693+
defer GinkgoRecover()
694+
Expect(m.Start(ctx)).ToNot(HaveOccurred())
695+
}()
696+
697+
<-runnableWasStarted
698+
close(done)
699+
})
700+
626701
It("should return an error if any Components fail to Start", func(done Done) {
627702
m, err := New(cfg, options)
628703
Expect(err).NotTo(HaveOccurred())
@@ -1625,3 +1700,52 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration
16251700
func (f *fakeDeferredLoader) InjectScheme(scheme *runtime.Scheme) error {
16261701
return nil
16271702
}
1703+
1704+
var _ Runnable = &cacheProvider{}
1705+
1706+
type cacheProvider struct {
1707+
cache cache.Cache
1708+
}
1709+
1710+
func (c *cacheProvider) GetCache() cache.Cache {
1711+
return c.cache
1712+
}
1713+
1714+
func (c *cacheProvider) Start(ctx context.Context) error {
1715+
return c.cache.Start(ctx)
1716+
}
1717+
1718+
type startSignalingInformer struct {
1719+
// The manager calls Start and WaitForCacheSync in
1720+
// parallel, so we have to protect wasStarted with a Mutex
1721+
// and block in WaitForCacheSync until it is true.
1722+
wasStartedLock sync.Mutex
1723+
wasStarted bool
1724+
// was synced will be true once Start was called and
1725+
// WaitForCacheSync returned, just like a real cache.
1726+
wasSynced bool
1727+
cache.Cache
1728+
}
1729+
1730+
func (c *startSignalingInformer) started() bool {
1731+
c.wasStartedLock.Lock()
1732+
defer c.wasStartedLock.Unlock()
1733+
return c.wasStarted
1734+
}
1735+
1736+
func (c *startSignalingInformer) Start(ctx context.Context) error {
1737+
c.wasStartedLock.Lock()
1738+
c.wasStarted = true
1739+
c.wasStartedLock.Unlock()
1740+
return c.Cache.Start(ctx)
1741+
}
1742+
1743+
func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
1744+
defer func() {
1745+
for !c.started() {
1746+
continue
1747+
}
1748+
c.wasSynced = true
1749+
}()
1750+
return c.Cache.WaitForCacheSync(ctx)
1751+
}

0 commit comments

Comments
 (0)