Skip to content

Commit 7c59ac6

Browse files
committed
Refactor manager internal around RunnableGroup(s)
Signed-off-by: Vince Prignano <[email protected]>
1 parent 3e870eb commit 7c59ac6

File tree

6 files changed

+473
-112
lines changed

6 files changed

+473
-112
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module sigs.k8s.io/controller-runtime
33
go 1.16
44

55
require (
6+
github.com/davecgh/go-spew v1.1.1
67
github.com/evanphx/json-patch v4.11.0+incompatible
78
github.com/fsnotify/fsnotify v1.4.9
89
github.com/go-logr/logr v0.4.0

pkg/manager/internal.go

Lines changed: 46 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28+
"github.com/davecgh/go-spew/spew"
2829
"github.com/go-logr/logr"
2930
"github.com/prometheus/client_golang/prometheus/promhttp"
3031
"k8s.io/apimachinery/pkg/api/meta"
@@ -105,10 +106,9 @@ type controllerManager struct {
105106
healthzHandler *healthz.Handler
106107

107108
mu sync.Mutex
108-
started bool
109-
startedLeader bool
110109
healthzStarted bool
111110
errChan chan error
111+
runnables *runnables
112112

113113
// controllerOptions are the global controller options.
114114
controllerOptions v1alpha1.ControllerConfigurationSpec
@@ -134,8 +134,6 @@ type controllerManager struct {
134134
// election was configured.
135135
elected chan struct{}
136136

137-
caches []hasCache
138-
139137
// port is the port that the webhook server serves at.
140138
port int
141139
// host is the hostname that the webhook server binds to.
@@ -160,10 +158,6 @@ type controllerManager struct {
160158
// between tries of actions.
161159
retryPeriod time.Duration
162160

163-
// waitForRunnable is holding the number of runnables currently running so that
164-
// we can wait for them to exit before quitting the manager
165-
waitForRunnable sync.WaitGroup
166-
167161
// gracefulShutdownTimeout is the duration given to runnable to stop
168162
// before the manager actually returns on stop.
169163
gracefulShutdownTimeout time.Duration
@@ -194,6 +188,7 @@ type hasCache interface {
194188
func (cm *controllerManager) Add(r Runnable) error {
195189
cm.mu.Lock()
196190
defer cm.mu.Unlock()
191+
197192
if cm.stopProcedureEngaged {
198193
return errors.New("can't accept new runnable as stop procedure is already engaged")
199194
}
@@ -203,31 +198,14 @@ func (cm *controllerManager) Add(r Runnable) error {
203198
return err
204199
}
205200

206-
var shouldStart bool
207-
208-
// Add the runnable to the leader election or the non-leaderelection list
209-
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
210-
shouldStart = cm.started
211-
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
212-
} else if hasCache, ok := r.(hasCache); ok {
213-
cm.caches = append(cm.caches, hasCache)
214-
if cm.started {
215-
cm.startRunnable(hasCache)
216-
if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) {
217-
return fmt.Errorf("could not sync cache")
201+
return cm.runnables.Add(r, func(ctx context.Context) bool {
202+
if cache, ok := r.(hasCache); ok {
203+
if !cache.GetCache().WaitForCacheSync(cm.internalCtx) {
204+
return false
218205
}
219206
}
220-
} else {
221-
shouldStart = cm.startedLeader
222-
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
223-
}
224-
225-
if shouldStart {
226-
// If already started, start the controller
227-
cm.startRunnable(r)
228-
}
229-
230-
return nil
207+
return true
208+
})
231209
}
232210

233211
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
@@ -385,13 +363,13 @@ func (cm *controllerManager) serveMetrics() {
385363
Handler: mux,
386364
}
387365
// Run the server
388-
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
366+
cm.runnables.Add(RunnableFunc(func(_ context.Context) error {
389367
cm.logger.Info("starting metrics server", "path", defaultMetricsEndpoint)
390368
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
391369
return err
392370
}
393371
return nil
394-
}))
372+
}), nil)
395373

396374
// Shutdown the server when stop is closed
397375
<-cm.internalProceduresStop
@@ -422,12 +400,12 @@ func (cm *controllerManager) serveHealthProbes() {
422400
}
423401

424402
// Run server
425-
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
403+
cm.runnables.Add(RunnableFunc(func(_ context.Context) error {
426404
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
427405
return err
428406
}
429407
return nil
430-
}))
408+
}), nil)
431409
cm.healthzStarted = true
432410
}()
433411

@@ -438,11 +416,30 @@ func (cm *controllerManager) serveHealthProbes() {
438416
}
439417
}
440418

419+
// Start starts the manager and locks indefinitely.
420+
// There is only two ways to have start return:
421+
// An error has occurred during in one of the internal operations,
422+
// such as leader election, cache start, webhooks, and so on.
423+
// Or, the context is cancelled.
441424
func (cm *controllerManager) Start(ctx context.Context) (err error) {
425+
cm.mu.Lock()
426+
{
427+
// Initialize the internal context.
428+
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
429+
430+
// initialize this here so that we reset the signal channel state on every start
431+
// Everything that might write into this channel must be started in a new goroutine,
432+
// because otherwise we might block this routine trying to write into the full channel
433+
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
434+
// it.
435+
cm.errChan = make(chan error)
436+
}
437+
cm.mu.Unlock()
438+
439+
// Add the cluster runnable.
442440
if err := cm.Add(cm.cluster); err != nil {
443441
return fmt.Errorf("failed to add cluster to runnables: %w", err)
444442
}
445-
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
446443

447444
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
448445
stopComplete := make(chan struct{})
@@ -463,13 +460,6 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
463460
}
464461
}()
465462

466-
// initialize this here so that we reset the signal channel state on every start
467-
// Everything that might write into this channel must be started in a new goroutine,
468-
// because otherwise we might block this routine trying to write into the full channel
469-
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
470-
// it.
471-
cm.errChan = make(chan error)
472-
473463
// Metrics should be served whether the controller is leader or not.
474464
// (If we don't serve metrics for non-leaders, prometheus will still scrape
475465
// the pod but will get a connection refused)
@@ -568,7 +558,10 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
568558
}()
569559

570560
go func() {
571-
cm.waitForRunnable.Wait()
561+
cm.runnables.others.StopAndWait()
562+
cm.runnables.caches.StopAndWait()
563+
cm.runnables.leaderElection.StopAndWait()
564+
cm.runnables.webhooks.StopAndWait()
572565
shutdownCancel()
573566
}()
574567

@@ -580,71 +573,29 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
580573
}
581574

582575
func (cm *controllerManager) startNonLeaderElectionRunnables() {
583-
cm.mu.Lock()
584-
defer cm.mu.Unlock()
585-
586576
// First start any webhook servers, which includes conversion, validation, and defaulting
587577
// webhooks that are registered.
588578
//
589579
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
590580
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
591581
// to never start because no cache can be populated.
592-
for _, c := range cm.nonLeaderElectionRunnables {
593-
if _, ok := c.(*webhook.Server); ok {
594-
cm.startRunnable(c)
595-
}
596-
}
582+
cm.runnables.webhooks.Start(cm.internalCtx, cm.errChan)
583+
cm.runnables.webhooks.WaitReady(cm.internalCtx)
597584

598585
// Start and wait for caches.
599-
cm.waitForCache(cm.internalCtx)
586+
cm.runnables.caches.WaitReady(cm.internalCtx)
600587

601588
// Start the non-leaderelection Runnables after the cache has synced
602-
for _, c := range cm.nonLeaderElectionRunnables {
603-
if _, ok := c.(*webhook.Server); ok {
604-
continue
605-
}
606-
607-
// Controllers block, but we want to return an error if any have an error starting.
608-
// Write any Start errors to a channel so we can return them
609-
cm.startRunnable(c)
610-
}
589+
cm.runnables.others.Start(cm.internalCtx, cm.errChan)
611590
}
612591

613592
func (cm *controllerManager) startLeaderElectionRunnables() {
614-
cm.mu.Lock()
615-
defer cm.mu.Unlock()
616-
617-
cm.waitForCache(cm.internalCtx)
618-
619-
// Start the leader election Runnables after the cache has synced
620-
for _, c := range cm.leaderElectionRunnables {
621-
// Controllers block, but we want to return an error if any have an error starting.
622-
// Write any Start errors to a channel so we can return them
623-
cm.startRunnable(c)
624-
}
625-
626-
cm.startedLeader = true
627-
}
593+
spew.Dump("STARTING THE CACHES!!!")
594+
cm.runnables.caches.Start(cm.internalCtx, cm.errChan)
595+
cm.runnables.caches.WaitReady(cm.internalCtx)
628596

629-
func (cm *controllerManager) waitForCache(ctx context.Context) {
630-
if cm.started {
631-
return
632-
}
633-
634-
for _, cache := range cm.caches {
635-
cm.startRunnable(cache)
636-
}
637-
638-
// Wait for the caches to sync.
639-
// TODO(community): Check the return value and write a test
640-
for _, cache := range cm.caches {
641-
cache.GetCache().WaitForCacheSync(ctx)
642-
}
643-
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
644-
// cm.started as check if we already started the cache so it must always become true.
645-
// Making sure that the cache doesn't get started twice is needed to not get a "close
646-
// of closed channel" panic
647-
cm.started = true
597+
cm.runnables.leaderElection.Start(cm.internalCtx, cm.errChan)
598+
cm.runnables.leaderElection.WaitReady(cm.internalCtx)
648599
}
649600

650601
func (cm *controllerManager) startLeaderElection() (err error) {
@@ -694,13 +645,3 @@ func (cm *controllerManager) startLeaderElection() (err error) {
694645
func (cm *controllerManager) Elected() <-chan struct{} {
695646
return cm.elected
696647
}
697-
698-
func (cm *controllerManager) startRunnable(r Runnable) {
699-
cm.waitForRunnable.Add(1)
700-
go func() {
701-
defer cm.waitForRunnable.Done()
702-
if err := r.Start(cm.internalCtx); err != nil {
703-
cm.errChan <- err
704-
}
705-
}()
706-
}

pkg/manager/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
367367

368368
return &controllerManager{
369369
cluster: cluster,
370+
runnables: newRunnables(),
370371
recorderProvider: recorderProvider,
371372
resourceLock: resourceLock,
372373
metricsListener: metricsListener,

0 commit comments

Comments
 (0)