Skip to content

Commit 40070e2

Browse files
authored
Merge pull request #651 from DirectXMan12/bug/controller-new-spawns-goroutines
🐛 prevent controller.New and manager's errChan from leaking goroutines
2 parents ad57a97 + 4d81992 commit 40070e2

File tree

7 files changed

+248
-130
lines changed

7 files changed

+248
-130
lines changed

pkg/builder/controller_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,26 +66,22 @@ var _ = Describe("application", func() {
6666
Expect(instance).NotTo(BeNil())
6767
})
6868

69-
It("should return an error if there is no GVK for an object", func() {
69+
It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() {
7070
By("creating a controller manager")
7171
m, err := manager.New(cfg, manager.Options{})
7272
Expect(err).NotTo(HaveOccurred())
7373

74+
By("creating a controller with a bad For type")
7475
instance, err := ControllerManagedBy(m).
7576
For(&fakeType{}).
7677
Owns(&appsv1.ReplicaSet{}).
7778
Build(noop)
78-
Expect(err).To(HaveOccurred())
79-
Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType"))
79+
Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType")))
8080
Expect(instance).To(BeNil())
8181

82-
instance, err = ControllerManagedBy(m).
83-
For(&appsv1.ReplicaSet{}).
84-
Owns(&fakeType{}).
85-
Build(noop)
86-
Expect(err).To(HaveOccurred())
87-
Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType"))
88-
Expect(instance).To(BeNil())
82+
// NB(directxman12): we don't test non-for types, since errors for
83+
// them now manifest on controller.Start, not controller.Watch. Errors on the For type
84+
// manifest when we try to default the controller name, which is good to double check.
8985
})
9086

9187
It("should return an error if it cannot create the controller", func() {

pkg/controller/controller.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8080

8181
// Create controller with dependencies set
8282
c := &controller.Controller{
83-
Do: options.Reconciler,
84-
Cache: mgr.GetCache(),
85-
Config: mgr.GetConfig(),
86-
Scheme: mgr.GetScheme(),
87-
Client: mgr.GetClient(),
88-
Recorder: mgr.GetEventRecorderFor(name),
89-
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
83+
Do: options.Reconciler,
84+
Cache: mgr.GetCache(),
85+
Config: mgr.GetConfig(),
86+
Scheme: mgr.GetScheme(),
87+
Client: mgr.GetClient(),
88+
Recorder: mgr.GetEventRecorderFor(name),
89+
MakeQueue: func() workqueue.RateLimitingInterface {
90+
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
91+
},
9092
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
9193
Name: name,
9294
}

pkg/internal/controller/controller.go

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ type Controller struct {
6868
// specified, or the ~/.kube/Config.
6969
Config *rest.Config
7070

71+
// MakeQueue constructs the queue for this controller once the controller is ready to start.
72+
// This exists because the standard Kubernetes workqueues start themselves immediately, which
73+
// leads to goroutine leaks if something calls controller.New repeatedly.
74+
MakeQueue func() workqueue.RateLimitingInterface
75+
7176
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
7277
// the Queue for processing
7378
Queue workqueue.RateLimitingInterface
@@ -93,6 +98,16 @@ type Controller struct {
9398
Recorder record.EventRecorder
9499

95100
// TODO(community): Consider initializing a logger with the Controller Name as the tag
101+
102+
// watches maintains a list of sources, handlers, and predicates to start when the controller is started.
103+
watches []watchDescription
104+
}
105+
106+
// watchDescription contains all the information necessary to start a watch.
107+
type watchDescription struct {
108+
src source.Source
109+
handler handler.EventHandler
110+
predicates []predicate.Predicate
96111
}
97112

98113
// Reconcile implements reconcile.Reconciler
@@ -118,47 +133,72 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
118133
}
119134
}
120135

121-
log.Info("Starting EventSource", "controller", c.Name, "source", src)
122-
return src.Start(evthdler, c.Queue, prct...)
136+
c.watches = append(c.watches, watchDescription{src: src, handler: evthdler, predicates: prct})
137+
if c.Started {
138+
log.Info("Starting EventSource", "controller", c.Name, "source", src)
139+
return src.Start(evthdler, c.Queue, prct...)
140+
}
141+
142+
return nil
123143
}
124144

125145
// Start implements controller.Controller
126146
func (c *Controller) Start(stop <-chan struct{}) error {
147+
// use an IIFE to get proper lock handling
148+
// but lock outside to get proper handling of the queue shutdown
127149
c.mu.Lock()
128150

129-
// TODO(pwittrock): Reconsider HandleCrash
130-
defer utilruntime.HandleCrash()
131-
defer c.Queue.ShutDown()
151+
c.Queue = c.MakeQueue()
152+
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
132153

133-
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
134-
log.Info("Starting Controller", "controller", c.Name)
154+
err := func() error {
155+
defer c.mu.Unlock()
135156

136-
// Wait for the caches to be synced before starting workers
137-
if c.WaitForCacheSync == nil {
138-
c.WaitForCacheSync = c.Cache.WaitForCacheSync
139-
}
140-
if ok := c.WaitForCacheSync(stop); !ok {
141-
// This code is unreachable right now since WaitForCacheSync will never return an error
142-
// Leaving it here because that could happen in the future
143-
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
144-
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
145-
c.mu.Unlock()
146-
return err
147-
}
157+
// TODO(pwittrock): Reconsider HandleCrash
158+
defer utilruntime.HandleCrash()
148159

149-
if c.JitterPeriod == 0 {
150-
c.JitterPeriod = 1 * time.Second
151-
}
160+
// NB(directxman12): launch the sources *before* trying to wait for the
161+
// caches to sync so that they have a chance to register their intendeded
162+
// caches.
163+
for _, watch := range c.watches {
164+
log.Info("Starting EventSource", "controller", c.Name, "source", watch.src)
165+
if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil {
166+
return err
167+
}
168+
}
152169

153-
// Launch workers to process resources
154-
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
155-
for i := 0; i < c.MaxConcurrentReconciles; i++ {
156-
// Process work items
157-
go wait.Until(c.worker, c.JitterPeriod, stop)
158-
}
170+
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
171+
log.Info("Starting Controller", "controller", c.Name)
172+
173+
// Wait for the caches to be synced before starting workers
174+
if c.WaitForCacheSync == nil {
175+
c.WaitForCacheSync = c.Cache.WaitForCacheSync
176+
}
177+
if ok := c.WaitForCacheSync(stop); !ok {
178+
// This code is unreachable right now since WaitForCacheSync will never return an error
179+
// Leaving it here because that could happen in the future
180+
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
181+
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
182+
return err
183+
}
184+
185+
if c.JitterPeriod == 0 {
186+
c.JitterPeriod = 1 * time.Second
187+
}
188+
189+
// Launch workers to process resources
190+
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
191+
for i := 0; i < c.MaxConcurrentReconciles; i++ {
192+
// Process work items
193+
go wait.Until(c.worker, c.JitterPeriod, stop)
194+
}
159195

160-
c.Started = true
161-
c.mu.Unlock()
196+
c.Started = true
197+
return nil
198+
}()
199+
if err != nil {
200+
return err
201+
}
162202

163203
<-stop
164204
log.Info("Stopping workers", "controller", c.Name)

0 commit comments

Comments
 (0)