Skip to content

Commit 5ad43b9

Browse files
committed
⚠️ Propagate context.Context throughout the codebase
Signed-off-by: Vince Prignano <[email protected]>
1 parent 420cd15 commit 5ad43b9

30 files changed

+414
-459
lines changed

pkg/builder/controller_test.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,10 @@ func (typedNoop) Reconcile(context.Context, reconcile.Request) (reconcile.Result
4949
}
5050

5151
var _ = Describe("application", func() {
52-
var stop chan struct{}
53-
5452
BeforeEach(func() {
55-
stop = make(chan struct{})
5653
newController = controller.New
5754
})
5855

59-
AfterEach(func() {
60-
close(stop)
61-
})
62-
6356
noop := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
6457
return reconcile.Result{}, nil
6558
})
@@ -212,17 +205,23 @@ var _ = Describe("application", func() {
212205

213206
Describe("Start with ControllerManagedBy", func() {
214207
It("should Reconcile Owns objects", func(done Done) {
208+
ctx, cancel := context.WithCancel(context.Background())
209+
defer cancel()
210+
215211
m, err := manager.New(cfg, manager.Options{})
216212
Expect(err).NotTo(HaveOccurred())
217213

218214
bldr := ControllerManagedBy(m).
219215
For(&appsv1.Deployment{}).
220216
Owns(&appsv1.ReplicaSet{})
221-
doReconcileTest("3", stop, bldr, m, false)
217+
doReconcileTest(ctx, "3", bldr, m, false)
222218
close(done)
223219
}, 10)
224220

225221
It("should Reconcile Watches objects", func(done Done) {
222+
ctx, cancel := context.WithCancel(context.Background())
223+
defer cancel()
224+
226225
m, err := manager.New(cfg, manager.Options{})
227226
Expect(err).NotTo(HaveOccurred())
228227

@@ -231,13 +230,16 @@ var _ = Describe("application", func() {
231230
Watches( // Equivalent of Owns
232231
&source.Kind{Type: &appsv1.ReplicaSet{}},
233232
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
234-
doReconcileTest("4", stop, bldr, m, true)
233+
doReconcileTest(ctx, "4", bldr, m, true)
235234
close(done)
236235
}, 10)
237236
})
238237

239238
Describe("Set custom predicates", func() {
240239
It("should execute registered predicates only for assigned kind", func(done Done) {
240+
ctx, cancel := context.WithCancel(context.Background())
241+
defer cancel()
242+
241243
m, err := manager.New(cfg, manager.Options{})
242244
Expect(err).NotTo(HaveOccurred())
243245

@@ -286,7 +288,7 @@ var _ = Describe("application", func() {
286288
Owns(&appsv1.ReplicaSet{}, WithPredicates(replicaSetPrct)).
287289
WithEventFilter(allPrct)
288290

289-
doReconcileTest("5", stop, bldr, m, true)
291+
doReconcileTest(ctx, "5", bldr, m, true)
290292

291293
Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
292294
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
@@ -298,7 +300,7 @@ var _ = Describe("application", func() {
298300

299301
})
300302

301-
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
303+
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
302304
deployName := "deploy-name-" + nameSuffix
303305
rsName := "rs-name-" + nameSuffix
304306

@@ -328,7 +330,7 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
328330
By("Starting the application")
329331
go func() {
330332
defer GinkgoRecover()
331-
Expect(mgr.Start(stop)).NotTo(HaveOccurred())
333+
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
332334
By("Stopping the application")
333335
}()
334336

pkg/builder/webhook_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package builder
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"net/http"
@@ -89,11 +90,11 @@ var _ = Describe("application", func() {
8990
}
9091
}`)
9192

92-
stopCh := make(chan struct{})
93-
close(stopCh)
93+
ctx, cancel := context.WithCancel(context.Background())
94+
defer cancel()
9495
// TODO: we may want to improve it to make it be able to inject dependencies,
9596
// but not always try to load certs and return not found error.
96-
err = svr.Start(stopCh)
97+
err = svr.Start(ctx)
9798
if err != nil && !os.IsNotExist(err) {
9899
Expect(err).NotTo(HaveOccurred())
99100
}
@@ -163,11 +164,11 @@ var _ = Describe("application", func() {
163164
}
164165
}`)
165166

166-
stopCh := make(chan struct{})
167-
close(stopCh)
167+
ctx, cancel := context.WithCancel(context.Background())
168+
defer cancel()
168169
// TODO: we may want to improve it to make it be able to inject dependencies,
169170
// but not always try to load certs and return not found error.
170-
err = svr.Start(stopCh)
171+
err = svr.Start(ctx)
171172
if err != nil && !os.IsNotExist(err) {
172173
Expect(err).NotTo(HaveOccurred())
173174
}
@@ -234,11 +235,11 @@ var _ = Describe("application", func() {
234235
}
235236
}`)
236237

237-
stopCh := make(chan struct{})
238-
close(stopCh)
238+
ctx, cancel := context.WithCancel(context.Background())
239+
defer cancel()
239240
// TODO: we may want to improve it to make it be able to inject dependencies,
240241
// but not always try to load certs and return not found error.
241-
err = svr.Start(stopCh)
242+
err = svr.Start(ctx)
242243
if err != nil && !os.IsNotExist(err) {
243244
Expect(err).NotTo(HaveOccurred())
244245
}
@@ -308,11 +309,11 @@ var _ = Describe("application", func() {
308309
}
309310
}
310311
}`)
311-
stopCh := make(chan struct{})
312-
close(stopCh)
312+
ctx, cancel := context.WithCancel(context.Background())
313+
defer cancel()
313314
// TODO: we may want to improve it to make it be able to inject dependencies,
314315
// but not always try to load certs and return not found error.
315-
err = svr.Start(stopCh)
316+
err = svr.Start(ctx)
316317
if err != nil && !os.IsNotExist(err) {
317318
Expect(err).NotTo(HaveOccurred())
318319
}

pkg/cache/cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ type Informers interface {
6060

6161
// Start runs all the informers known to this cache until the given channel is closed.
6262
// It blocks.
63-
Start(stopCh <-chan struct{}) error
63+
Start(ctx context.Context) error
6464

6565
// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
66-
WaitForCacheSync(stop <-chan struct{}) bool
66+
WaitForCacheSync(ctx context.Context) bool
6767

6868
// Informers knows how to add indices to the caches (informers) that it manages.
6969
client.FieldIndexer

pkg/cache/cache_test.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,17 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
8383
func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error)) {
8484
Describe("Cache test", func() {
8585
var (
86-
informerCache cache.Cache
87-
stop chan struct{}
88-
knownPod1 runtime.Object
89-
knownPod2 runtime.Object
90-
knownPod3 runtime.Object
91-
knownPod4 runtime.Object
86+
informerCache cache.Cache
87+
informerCacheCtx context.Context
88+
informerCacheCancel context.CancelFunc
89+
knownPod1 runtime.Object
90+
knownPod2 runtime.Object
91+
knownPod3 runtime.Object
92+
knownPod4 runtime.Object
9293
)
9394

9495
BeforeEach(func() {
95-
stop = make(chan struct{})
96+
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
9697
Expect(cfg).NotTo(BeNil())
9798

9899
By("creating three pods")
@@ -123,11 +124,11 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
123124
Expect(err).NotTo(HaveOccurred())
124125
By("running the cache and waiting for it to sync")
125126
// pass as an arg so that we don't race between close and re-assign
126-
go func(stopCh chan struct{}) {
127+
go func(ctx context.Context) {
127128
defer GinkgoRecover()
128-
Expect(informerCache.Start(stopCh)).To(Succeed())
129-
}(stop)
130-
Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue())
129+
Expect(informerCache.Start(ctx)).To(Succeed())
130+
}(informerCacheCtx)
131+
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
131132
})
132133

133134
AfterEach(func() {
@@ -137,7 +138,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
137138
deletePod(knownPod3)
138139
deletePod(knownPod4)
139140

140-
close(stop)
141+
informerCacheCancel()
141142
})
142143

143144
Describe("as a Reader", func() {
@@ -394,11 +395,13 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
394395
Expect(err).NotTo(HaveOccurred())
395396

396397
By("running the cache and waiting for it to sync")
398+
ctx, cancel := context.WithCancel(context.Background())
399+
defer cancel()
397400
go func() {
398401
defer GinkgoRecover()
399-
Expect(namespacedCache.Start(stop)).To(Succeed())
402+
Expect(namespacedCache.Start(ctx)).To(Succeed())
400403
}()
401-
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())
404+
Expect(namespacedCache.WaitForCacheSync(ctx)).NotTo(BeFalse())
402405

403406
By("listing pods in all namespaces")
404407
out := &unstructured.UnstructuredList{}
@@ -572,11 +575,13 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
572575
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())
573576

574577
By("running the cache and waiting for it to sync")
578+
ctx, cancel := context.WithCancel(context.Background())
579+
defer cancel()
575580
go func() {
576581
defer GinkgoRecover()
577-
Expect(informer.Start(stop)).To(Succeed())
582+
Expect(informer.Start(ctx)).To(Succeed())
578583
}()
579-
Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse())
584+
Expect(informer.WaitForCacheSync(ctx)).NotTo(BeFalse())
580585

581586
By("listing Pods with restartPolicyOnFailure")
582587
listObj := &kcorev1.PodList{}
@@ -636,7 +641,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
636641
Object: map[string]interface{}{
637642
"spec": map[string]interface{}{
638643
"containers": []map[string]interface{}{
639-
map[string]interface{}{
644+
{
640645
"name": "nginx",
641646
"image": "nginx",
642647
},
@@ -700,11 +705,13 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
700705
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())
701706

702707
By("running the cache and waiting for it to sync")
708+
ctx, cancel := context.WithCancel(context.Background())
709+
defer cancel()
703710
go func() {
704711
defer GinkgoRecover()
705-
Expect(informer.Start(stop)).To(Succeed())
712+
Expect(informer.Start(ctx)).To(Succeed())
706713
}()
707-
Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse())
714+
Expect(informer.WaitForCacheSync(ctx)).NotTo(BeFalse())
708715

709716
By("listing Pods with restartPolicyOnFailure")
710717
listObj := &unstructured.UnstructuredList{}

pkg/cache/informertest/fake_cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj runtime.Object) (ca
8080
}
8181

8282
// WaitForCacheSync implements Informers
83-
func (c *FakeInformers) WaitForCacheSync(stop <-chan struct{}) bool {
83+
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
8484
if c.Synced == nil {
8585
return true
8686
}
@@ -121,7 +121,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
121121
}
122122

123123
// Start implements Informers
124-
func (c *FakeInformers) Start(stopCh <-chan struct{}) error {
124+
func (c *FakeInformers) Start(ctx context.Context) error {
125125
return c.Error
126126
}
127127

pkg/cache/internal/deleg_map.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,25 @@ func NewInformersMap(config *rest.Config,
5757
}
5858

5959
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
60-
func (m *InformersMap) Start(stop <-chan struct{}) error {
61-
go m.structured.Start(stop)
62-
go m.unstructured.Start(stop)
63-
<-stop
60+
func (m *InformersMap) Start(ctx context.Context) error {
61+
go m.structured.Start(ctx)
62+
go m.unstructured.Start(ctx)
63+
<-ctx.Done()
6464
return nil
6565
}
6666

6767
// WaitForCacheSync waits until all the caches have been started and synced.
68-
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
68+
func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool {
6969
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...)
7070
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...)
7171

72-
if !m.structured.waitForStarted(stop) {
72+
if !m.structured.waitForStarted(ctx) {
7373
return false
7474
}
75-
if !m.unstructured.waitForStarted(stop) {
75+
if !m.unstructured.waitForStarted(ctx) {
7676
return false
7777
}
78-
return cache.WaitForCacheSync(stop, syncedFuncs...)
78+
return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...)
7979
}
8080

8181
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns

pkg/cache/internal/informers_map.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,31 +123,31 @@ type specificInformersMap struct {
123123

124124
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
125125
// It doesn't return start because it can't return an error, and it's not a runnable directly.
126-
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
126+
func (ip *specificInformersMap) Start(ctx context.Context) {
127127
func() {
128128
ip.mu.Lock()
129129
defer ip.mu.Unlock()
130130

131131
// Set the stop channel so it can be passed to informers that are added later
132-
ip.stop = stop
132+
ip.stop = ctx.Done()
133133

134134
// Start each informer
135135
for _, informer := range ip.informersByGVK {
136-
go informer.Informer.Run(stop)
136+
go informer.Informer.Run(ctx.Done())
137137
}
138138

139139
// Set started to true so we immediately start any informers added later.
140140
ip.started = true
141141
close(ip.startWait)
142142
}()
143-
<-stop
143+
<-ctx.Done()
144144
}
145145

146-
func (ip *specificInformersMap) waitForStarted(stop <-chan struct{}) bool {
146+
func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool {
147147
select {
148148
case <-ip.startWait:
149149
return true
150-
case <-stop:
150+
case <-ctx.Done():
151151
return false
152152
}
153153
}

pkg/cache/multi_namespace_cache.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,23 +94,23 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
9494
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
9595
}
9696

97-
func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error {
97+
func (c *multiNamespaceCache) Start(ctx context.Context) error {
9898
for ns, cache := range c.namespaceToCache {
9999
go func(ns string, cache Cache) {
100-
err := cache.Start(stopCh)
100+
err := cache.Start(ctx)
101101
if err != nil {
102102
log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns)
103103
}
104104
}(ns, cache)
105105
}
106-
<-stopCh
106+
<-ctx.Done()
107107
return nil
108108
}
109109

110-
func (c *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool {
110+
func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
111111
synced := true
112112
for _, cache := range c.namespaceToCache {
113-
if s := cache.WaitForCacheSync(stop); !s {
113+
if s := cache.WaitForCacheSync(ctx); !s {
114114
synced = s
115115
}
116116
}

0 commit comments

Comments
 (0)