@@ -104,11 +104,23 @@ type controllerManager struct {
104
104
// Healthz probe handler
105
105
healthzHandler * healthz.Handler
106
106
107
- mu sync.Mutex
108
- started bool
109
- startedLeader bool
107
+ // addMu protects controllerManager from Add, AddHealthzCheck, AddMetricsExtraHandler, AddReadyzCheck being
108
+ // called while the data they collect are being read.
109
+ addMu sync.RWMutex
110
+
111
+ // started tracks if the check has been started.
112
+ started bool
113
+
114
+ // leader runnable started.
115
+ startedLeader bool
116
+
117
+ // healthz started. In other words, we should not add healthz or readyz to the manager
110
118
healthzStarted bool
111
- errChan chan error
119
+
120
+ // cacheMu protects waitForCache from being executed twice concurrently.
121
+ cacheMu sync.Mutex
122
+
123
+ errChan chan error
112
124
113
125
// controllerOptions are the global controller options.
114
126
controllerOptions v1alpha1.ControllerConfigurationSpec
@@ -192,8 +204,8 @@ type hasCache interface {
192
204
193
205
// Add sets dependencies on i, and adds it to the list of Runnables to start.
194
206
func (cm * controllerManager ) Add (r Runnable ) error {
195
- cm .mu .Lock ()
196
- defer cm .mu .Unlock ()
207
+ cm .addMu .Lock ()
208
+ defer cm .addMu .Unlock ()
197
209
if cm .stopProcedureEngaged {
198
210
return errors .New ("can't accept new runnable as stop procedure is already engaged" )
199
211
}
@@ -248,8 +260,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha
248
260
return fmt .Errorf ("overriding builtin %s endpoint is not allowed" , defaultMetricsEndpoint )
249
261
}
250
262
251
- cm .mu .Lock ()
252
- defer cm .mu .Unlock ()
263
+ cm .addMu .Lock ()
264
+ defer cm .addMu .Unlock ()
253
265
254
266
if _ , found := cm .metricsExtraHandlers [path ]; found {
255
267
return fmt .Errorf ("can't register extra handler by duplicate path %q on metrics http server" , path )
@@ -262,8 +274,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha
262
274
263
275
// AddHealthzCheck allows you to add Healthz checker.
264
276
func (cm * controllerManager ) AddHealthzCheck (name string , check healthz.Checker ) error {
265
- cm .mu .Lock ()
266
- defer cm .mu .Unlock ()
277
+ cm .addMu .Lock ()
278
+ defer cm .addMu .Unlock ()
267
279
268
280
if cm .stopProcedureEngaged {
269
281
return errors .New ("can't accept new healthCheck as stop procedure is already engaged" )
@@ -283,8 +295,8 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
283
295
284
296
// AddReadyzCheck allows you to add Readyz checker.
285
297
func (cm * controllerManager ) AddReadyzCheck (name string , check healthz.Checker ) error {
286
- cm .mu .Lock ()
287
- defer cm .mu .Unlock ()
298
+ cm .addMu .Lock ()
299
+ defer cm .addMu .Unlock ()
288
300
289
301
if cm .stopProcedureEngaged {
290
302
return errors .New ("can't accept new ready check as stop procedure is already engaged" )
@@ -367,8 +379,8 @@ func (cm *controllerManager) serveMetrics() {
367
379
mux .Handle (defaultMetricsEndpoint , handler )
368
380
369
381
func () {
370
- cm .mu . Lock ()
371
- defer cm .mu . Unlock ()
382
+ cm .addMu . RLock ()
383
+ defer cm .addMu . RUnlock ()
372
384
373
385
for path , extraHandler := range cm .metricsExtraHandlers {
374
386
mux .Handle (path , extraHandler )
@@ -401,8 +413,8 @@ func (cm *controllerManager) serveHealthProbes() {
401
413
}
402
414
403
415
func () {
404
- cm .mu . Lock ()
405
- defer cm .mu . Unlock ()
416
+ cm .addMu . RLock ()
417
+ defer cm .addMu . RUnlock ()
406
418
407
419
if cm .readyzHandler != nil {
408
420
mux .Handle (cm .readinessEndpointName , http .StripPrefix (cm .readinessEndpointName , cm .readyzHandler ))
@@ -422,6 +434,9 @@ func (cm *controllerManager) serveHealthProbes() {
422
434
}
423
435
return nil
424
436
}))
437
+
438
+ // Note: healthzStarted is used by AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because
439
+ // addMu.RLock() prevents the above functions to be executed concurrently with this operation.
425
440
cm .healthzStarted = true
426
441
}()
427
442
@@ -535,8 +550,11 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
535
550
if cm .gracefulShutdownTimeout == 0 {
536
551
return nil
537
552
}
538
- cm .mu .Lock ()
539
- defer cm .mu .Unlock ()
553
+ cm .addMu .RLock ()
554
+ defer cm .addMu .RUnlock ()
555
+
556
+ // Note: stopProcedureEngaged is used by Add, AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because
557
+ // addMu.RLock() prevents the above functions to be executed concurrently with this operation.
540
558
cm .stopProcedureEngaged = true
541
559
542
560
// we want to close this after the other runnables stop, because we don't
@@ -574,8 +592,8 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
574
592
}
575
593
576
594
func (cm * controllerManager ) startNonLeaderElectionRunnables () {
577
- cm .mu . Lock ()
578
- defer cm .mu . Unlock ()
595
+ cm .addMu . RLock ()
596
+ defer cm .addMu . RUnlock ()
579
597
580
598
// First start any webhook servers, which includes conversion, validation, and defaulting
581
599
// webhooks that are registered.
@@ -605,8 +623,8 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
605
623
}
606
624
607
625
func (cm * controllerManager ) startLeaderElectionRunnables () {
608
- cm .mu . Lock ()
609
- defer cm .mu . Unlock ()
626
+ cm .addMu . RLock ()
627
+ defer cm .addMu . RUnlock ()
610
628
611
629
cm .waitForCache (cm .internalCtx )
612
630
@@ -617,10 +635,15 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
617
635
cm .startRunnable (c )
618
636
}
619
637
638
+ // Note: startedLeader is used by Add, but it is safe to change here because
639
+ // addMu.RLock() prevents the above function to be executed concurrently with this operation.
620
640
cm .startedLeader = true
621
641
}
622
642
623
643
func (cm * controllerManager ) waitForCache (ctx context.Context ) {
644
+ cm .cacheMu .Lock ()
645
+ defer cm .cacheMu .Unlock ()
646
+
624
647
if cm .started {
625
648
return
626
649
}
@@ -638,14 +661,22 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
638
661
// cm.started as check if we already started the cache so it must always become true.
639
662
// Making sure that the cache doesn't get started twice is needed to not get a "close
640
663
// of closed channel" panic
664
+
665
+ // Note: started is used by Add, so it is required to get an addMu lock/RLock before
666
+ // calling this func in order to prevent the above function to be executed concurrently
667
+ // with this operation.
641
668
cm .started = true
642
669
}
643
670
644
671
func (cm * controllerManager ) startLeaderElection () (err error ) {
645
672
ctx , cancel := context .WithCancel (context .Background ())
646
- cm .mu .Lock ()
673
+
674
+ // Note: leaderElectionCancel is used by engageStopProcedure, which already gets a addMu.Rlock;
675
+ // thus, in order to prevent the above function to be executed concurrently with this operation, we
676
+ // require and addMu.Lock also here.
677
+ cm .addMu .Lock ()
647
678
cm .leaderElectionCancel = cancel
648
- cm .mu .Unlock ()
679
+ cm .addMu .Unlock ()
649
680
650
681
if cm .onStoppedLeading == nil {
651
682
cm .onStoppedLeading = func () {
0 commit comments