Skip to content

Commit d98ce09

Browse files
committed
Switch away from using errChan in the manager
Instead of using an error channel in the manager, we just use a mutex and store one error. The "waiting" is done by blocking on a channel which gets closed to signal errors. This achives the same effect (only return one error) without having the chance of blocking goroutines; with the old code, goroutines could block trying to return their results (especially since a few tried to return nil results), and since runnables can be added after start, there's no way to appropriately size the channel to avoid this happening (plus no point, since we only report the first error anyway). We also only report errors when the occurred, never signaling for errors with a nil error value.
1 parent 0fdf465 commit d98ce09

File tree

3 files changed

+68
-15
lines changed

3 files changed

+68
-15
lines changed

pkg/manager/internal.go

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ type controllerManager struct {
114114
started bool
115115
startedLeader bool
116116
healthzStarted bool
117-
errChan chan error
117+
118+
// NB(directxman12): we don't just use an error channel here to avoid the situation where the
119+
// error channel is too small and we end up blocking some goroutines waiting to report their errors.
120+
// errSignal lets us track when we should stop because an error occurred
121+
errSignal *errSignaler
118122

119123
// internalStop is the stop channel *actually* used by everything involved
120124
// with the manager as a stop channel, so that we can pass a stop channel
@@ -150,6 +154,45 @@ type controllerManager struct {
150154
retryPeriod time.Duration
151155
}
152156

157+
type errSignaler struct {
158+
// errSignal indicates that an error occurred, when closed. It shouldn't
159+
// be written to.
160+
errSignal chan struct{}
161+
162+
// err is the received error
163+
err error
164+
165+
mu sync.Mutex
166+
}
167+
168+
func (r *errSignaler) SignalError(err error) {
169+
r.mu.Lock()
170+
defer r.mu.Unlock()
171+
172+
if r.err != nil {
173+
// we already have an error, don't try again
174+
return
175+
}
176+
177+
// save the error and report it
178+
r.err = err
179+
close(r.errSignal)
180+
}
181+
182+
func (r *errSignaler) Error() error {
183+
r.mu.Lock()
184+
defer r.mu.Unlock()
185+
186+
return r.err
187+
}
188+
189+
func (r *errSignaler) GotError() chan struct{} {
190+
r.mu.Lock()
191+
defer r.mu.Unlock()
192+
193+
return r.errSignal
194+
}
195+
153196
// Add sets dependencies on i, and adds it to the list of Runnables to start.
154197
func (cm *controllerManager) Add(r Runnable) error {
155198
cm.mu.Lock()
@@ -174,7 +217,9 @@ func (cm *controllerManager) Add(r Runnable) error {
174217
if shouldStart {
175218
// If already started, start the controller
176219
go func() {
177-
cm.errChan <- r.Start(cm.internalStop)
220+
if err := r.Start(cm.internalStop); err != nil {
221+
cm.errSignal.SignalError(err)
222+
}
178223
}()
179224
}
180225

@@ -304,15 +349,15 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
304349
go func() {
305350
log.Info("starting metrics server", "path", metricsPath)
306351
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
307-
cm.errChan <- err
352+
cm.errSignal.SignalError(err)
308353
}
309354
}()
310355

311356
// Shutdown the server when stop is closed
312357
select {
313358
case <-stop:
314359
if err := server.Shutdown(context.Background()); err != nil {
315-
cm.errChan <- err
360+
cm.errSignal.SignalError(err)
316361
}
317362
}
318363
}
@@ -334,7 +379,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
334379
// Run server
335380
go func() {
336381
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
337-
cm.errChan <- err
382+
cm.errSignal.SignalError(err)
338383
}
339384
}()
340385
cm.healthzStarted = true
@@ -344,7 +389,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
344389
select {
345390
case <-stop:
346391
if err := server.Shutdown(context.Background()); err != nil {
347-
cm.errChan <- err
392+
cm.errSignal.SignalError(err)
348393
}
349394
}
350395
}
@@ -353,6 +398,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
353398
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
354399
defer close(cm.internalStopper)
355400

401+
// initialize this here so that we reset the signal channel state on every start
402+
cm.errSignal = &errSignaler{errSignal: make(chan struct{})}
403+
356404
// Metrics should be served whether the controller is leader or not.
357405
// (If we don't serve metrics for non-leaders, prometheus will still scrape
358406
// the pod but will get a connection refused)
@@ -380,9 +428,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
380428
case <-stop:
381429
// We are done
382430
return nil
383-
case err := <-cm.errChan:
431+
case <-cm.errSignal.GotError():
384432
// Error starting a controller
385-
return err
433+
return cm.errSignal.Error()
386434
}
387435
}
388436

@@ -398,7 +446,9 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
398446
// Write any Start errors to a channel so we can return them
399447
ctrl := c
400448
go func() {
401-
cm.errChan <- ctrl.Start(cm.internalStop)
449+
if err := ctrl.Start(cm.internalStop); err != nil {
450+
cm.errSignal.SignalError(err)
451+
}
402452
}()
403453
}
404454
}
@@ -415,7 +465,9 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
415465
// Write any Start errors to a channel so we can return them
416466
ctrl := c
417467
go func() {
418-
cm.errChan <- ctrl.Start(cm.internalStop)
468+
if err := ctrl.Start(cm.internalStop); err != nil {
469+
cm.errSignal.SignalError(err)
470+
}
419471
}()
420472
}
421473

@@ -433,7 +485,7 @@ func (cm *controllerManager) waitForCache() {
433485
}
434486
go func() {
435487
if err := cm.startCache(cm.internalStop); err != nil {
436-
cm.errChan <- err
488+
cm.errSignal.SignalError(err)
437489
}
438490
}()
439491

@@ -457,7 +509,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
457509
// Most implementations of leader election log.Fatal() here.
458510
// Since Start is wrapped in log.Fatal when called, we can just return
459511
// an error here which will cause the program to exit.
460-
cm.errChan <- fmt.Errorf("leader election lost")
512+
cm.errSignal.SignalError(fmt.Errorf("leader election lost"))
461513
},
462514
},
463515
})

pkg/manager/manager.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,6 @@ func New(config *rest.Config, options Options) (Manager, error) {
291291
return &controllerManager{
292292
config: config,
293293
scheme: options.Scheme,
294-
errChan: make(chan error),
295294
cache: cache,
296295
fieldIndexes: cache,
297296
client: writeObj,

pkg/manager/manager_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ var _ = Describe("manger.Manager", func() {
283283
mgr.startCache = func(stop <-chan struct{}) error {
284284
return fmt.Errorf("expected error")
285285
}
286-
Expect(m.Start(stop).Error()).To(ContainSubstring("expected error"))
286+
Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error")))
287287

288288
close(done)
289289
})
@@ -314,7 +314,9 @@ var _ = Describe("manger.Manager", func() {
314314

315315
go func() {
316316
defer GinkgoRecover()
317-
Expect(m.Start(stop)).NotTo(HaveOccurred())
317+
// NB(directxman12): this should definitely return an error. If it doesn't happen,
318+
// it means someone was signaling "stop: error" with a nil "error".
319+
Expect(m.Start(stop)).NotTo(Succeed())
318320
close(done)
319321
}()
320322
<-c1

0 commit comments

Comments
 (0)