@@ -43,6 +43,7 @@ import (
43
43
"sigs.k8s.io/controller-runtime/pkg/cache"
44
44
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
45
45
"sigs.k8s.io/controller-runtime/pkg/client"
46
+ "sigs.k8s.io/controller-runtime/pkg/cluster"
46
47
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
47
48
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
48
49
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
@@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() {
612
613
}
613
614
mgr , ok := m .(* controllerManager )
614
615
Expect (ok ).To (BeTrue ())
615
- mgr .startCache = func (context.Context ) error {
616
- return fmt .Errorf ("expected error" )
617
- }
616
+ mgr .caches = []hasCache {& cacheProvider {cache : & informertest.FakeInformers {Error : fmt .Errorf ("expected error" )}}}
618
617
619
618
ctx , cancel := context .WithCancel (context .Background ())
620
619
defer cancel ()
@@ -623,6 +622,82 @@ var _ = Describe("manger.Manager", func() {
623
622
close (done )
624
623
})
625
624
625
+ It ("should start the cache before starting anything else" , func (done Done ) {
626
+ fakeCache := & startSignalingInformer {Cache : & informertest.FakeInformers {}}
627
+ options .NewCache = func (_ * rest.Config , _ cache.Options ) (cache.Cache , error ) {
628
+ return fakeCache , nil
629
+ }
630
+ m , err := New (cfg , options )
631
+ Expect (err ).NotTo (HaveOccurred ())
632
+ for _ , cb := range callbacks {
633
+ cb (m )
634
+ }
635
+
636
+ runnableWasStarted := make (chan struct {})
637
+ Expect (m .Add (RunnableFunc (func (ctx context.Context ) error {
638
+ defer GinkgoRecover ()
639
+ if ! fakeCache .wasSynced {
640
+ return errors .New ("runnable got started before cache was synced" )
641
+ }
642
+ close (runnableWasStarted )
643
+ return nil
644
+ }))).To (Succeed ())
645
+
646
+ ctx , cancel := context .WithCancel (context .Background ())
647
+ defer cancel ()
648
+ go func () {
649
+ defer GinkgoRecover ()
650
+ Expect (m .Start (ctx )).ToNot (HaveOccurred ())
651
+ }()
652
+
653
+ <- runnableWasStarted
654
+ close (done )
655
+ })
656
+
657
+ It ("should start additional clusters before anything else" , func (done Done ) {
658
+ fakeCache := & startSignalingInformer {Cache : & informertest.FakeInformers {}}
659
+ options .NewCache = func (_ * rest.Config , _ cache.Options ) (cache.Cache , error ) {
660
+ return fakeCache , nil
661
+ }
662
+ m , err := New (cfg , options )
663
+ Expect (err ).NotTo (HaveOccurred ())
664
+ for _ , cb := range callbacks {
665
+ cb (m )
666
+ }
667
+
668
+ additionalClusterCache := & startSignalingInformer {Cache : & informertest.FakeInformers {}}
669
+ additionalCluster , err := cluster .New (cfg , func (o * cluster.Options ) {
670
+ o .NewCache = func (_ * rest.Config , _ cache.Options ) (cache.Cache , error ) {
671
+ return additionalClusterCache , nil
672
+ }
673
+ })
674
+ Expect (err ).NotTo (HaveOccurred ())
675
+ Expect (m .Add (additionalCluster )).NotTo (HaveOccurred ())
676
+
677
+ runnableWasStarted := make (chan struct {})
678
+ Expect (m .Add (RunnableFunc (func (ctx context.Context ) error {
679
+ defer GinkgoRecover ()
680
+ if ! fakeCache .wasSynced {
681
+ return errors .New ("WaitForCacheSyncCalled wasn't called before Runnable got started" )
682
+ }
683
+ if ! additionalClusterCache .wasSynced {
684
+ return errors .New ("the additional clusters WaitForCacheSync wasn't called before Runnable got started" )
685
+ }
686
+ close (runnableWasStarted )
687
+ return nil
688
+ }))).To (Succeed ())
689
+
690
+ ctx , cancel := context .WithCancel (context .Background ())
691
+ defer cancel ()
692
+ go func () {
693
+ defer GinkgoRecover ()
694
+ Expect (m .Start (ctx )).ToNot (HaveOccurred ())
695
+ }()
696
+
697
+ <- runnableWasStarted
698
+ close (done )
699
+ })
700
+
626
701
It ("should return an error if any Components fail to Start" , func (done Done ) {
627
702
m , err := New (cfg , options )
628
703
Expect (err ).NotTo (HaveOccurred ())
@@ -1625,3 +1700,52 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration
1625
1700
func (f * fakeDeferredLoader ) InjectScheme (scheme * runtime.Scheme ) error {
1626
1701
return nil
1627
1702
}
1703
+
1704
+ var _ Runnable = & cacheProvider {}
1705
+
1706
+ type cacheProvider struct {
1707
+ cache cache.Cache
1708
+ }
1709
+
1710
+ func (c * cacheProvider ) GetCache () cache.Cache {
1711
+ return c .cache
1712
+ }
1713
+
1714
+ func (c * cacheProvider ) Start (ctx context.Context ) error {
1715
+ return c .cache .Start (ctx )
1716
+ }
1717
+
1718
+ type startSignalingInformer struct {
1719
+ // The manager calls Start and WaitForCacheSync in
1720
+ // parallel, so we have to protect wasStarted with a Mutex
1721
+ // and block in WaitForCacheSync until it is true.
1722
+ wasStartedLock sync.Mutex
1723
+ wasStarted bool
1724
+ // was synced will be true once Start was called and
1725
+ // WaitForCacheSync returned, just like a real cache.
1726
+ wasSynced bool
1727
+ cache.Cache
1728
+ }
1729
+
1730
+ func (c * startSignalingInformer ) started () bool {
1731
+ c .wasStartedLock .Lock ()
1732
+ defer c .wasStartedLock .Unlock ()
1733
+ return c .wasStarted
1734
+ }
1735
+
1736
+ func (c * startSignalingInformer ) Start (ctx context.Context ) error {
1737
+ c .wasStartedLock .Lock ()
1738
+ c .wasStarted = true
1739
+ c .wasStartedLock .Unlock ()
1740
+ return c .Cache .Start (ctx )
1741
+ }
1742
+
1743
+ func (c * startSignalingInformer ) WaitForCacheSync (ctx context.Context ) bool {
1744
+ defer func () {
1745
+ for ! c .started () {
1746
+ continue
1747
+ }
1748
+ c .wasSynced = true
1749
+ }()
1750
+ return c .Cache .WaitForCacheSync (ctx )
1751
+ }
0 commit comments