Skip to content

Commit 0fdf465

Browse files
committed
Don't leak goroutines in controller.New
This quashes a goroutine leak caused by calling controller.New repeatedly without calling Start. controller.New was creating a new workqueue, which was starting goroutines and then expecting to be shut down (by the shutdown method, which is only called at the end of Start). To solve that, we move workqueue initialization to controller.Start. This means that we also move watch starting to controller.Start, but this seems pretty sensible anyway.
1 parent ecdbe54 commit 0fdf465

File tree

4 files changed

+168
-115
lines changed

4 files changed

+168
-115
lines changed

pkg/builder/controller_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,26 +66,22 @@ var _ = Describe("application", func() {
6666
Expect(instance).NotTo(BeNil())
6767
})
6868

69-
It("should return an error if there is no GVK for an object", func() {
69+
It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() {
7070
By("creating a controller manager")
7171
m, err := manager.New(cfg, manager.Options{})
7272
Expect(err).NotTo(HaveOccurred())
7373

74+
By("creating a controller with a bad For type")
7475
instance, err := ControllerManagedBy(m).
7576
For(&fakeType{}).
7677
Owns(&appsv1.ReplicaSet{}).
7778
Build(noop)
78-
Expect(err).To(HaveOccurred())
79-
Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType"))
79+
Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType")))
8080
Expect(instance).To(BeNil())
8181

82-
instance, err = ControllerManagedBy(m).
83-
For(&appsv1.ReplicaSet{}).
84-
Owns(&fakeType{}).
85-
Build(noop)
86-
Expect(err).To(HaveOccurred())
87-
Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType"))
88-
Expect(instance).To(BeNil())
82+
// NB(directxman12): we don't test non-for types, since errors for
83+
// them now manifest on controller.Start, not controller.Watch. Errors on the For type
84+
// manifest when we try to default the controller name, which is good to double check.
8985
})
9086

9187
It("should return an error if it cannot create the controller", func() {

pkg/controller/controller.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8080

8181
// Create controller with dependencies set
8282
c := &controller.Controller{
83-
Do: options.Reconciler,
84-
Cache: mgr.GetCache(),
85-
Config: mgr.GetConfig(),
86-
Scheme: mgr.GetScheme(),
87-
Client: mgr.GetClient(),
88-
Recorder: mgr.GetEventRecorderFor(name),
89-
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
83+
Do: options.Reconciler,
84+
Cache: mgr.GetCache(),
85+
Config: mgr.GetConfig(),
86+
Scheme: mgr.GetScheme(),
87+
Client: mgr.GetClient(),
88+
Recorder: mgr.GetEventRecorderFor(name),
89+
MakeQueue: func() workqueue.RateLimitingInterface {
90+
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
91+
},
9092
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
9193
Name: name,
9294
}

pkg/internal/controller/controller.go

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ type Controller struct {
6868
// specified, or the ~/.kube/Config.
6969
Config *rest.Config
7070

71+
// MakeQueue constructs the queue for this controller once the controller is ready to start.
72+
// This exists because the standard Kubernetes workqueues start themselves immediately, which
73+
// leads to goroutine leaks if something calls controller.New repeatedly.
74+
MakeQueue func() workqueue.RateLimitingInterface
75+
7176
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
7277
// the Queue for processing
7378
Queue workqueue.RateLimitingInterface
@@ -93,6 +98,16 @@ type Controller struct {
9398
Recorder record.EventRecorder
9499

95100
// TODO(community): Consider initializing a logger with the Controller Name as the tag
101+
102+
// watches maintains a list of sources, handlers, and predicates to start when the controller is started.
103+
watches []watchDescription
104+
}
105+
106+
// watchDescription contains all the information necessary to start a watch.
107+
type watchDescription struct {
108+
src source.Source
109+
handler handler.EventHandler
110+
predicates []predicate.Predicate
96111
}
97112

98113
// Reconcile implements reconcile.Reconciler
@@ -118,47 +133,72 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
118133
}
119134
}
120135

121-
log.Info("Starting EventSource", "controller", c.Name, "source", src)
122-
return src.Start(evthdler, c.Queue, prct...)
136+
c.watches = append(c.watches, watchDescription{src: src, handler: evthdler, predicates: prct})
137+
if c.Started {
138+
log.Info("Starting EventSource", "controller", c.Name, "source", src)
139+
return src.Start(evthdler, c.Queue, prct...)
140+
}
141+
142+
return nil
123143
}
124144

125145
// Start implements controller.Controller
126146
func (c *Controller) Start(stop <-chan struct{}) error {
147+
// use an IIFE to get proper lock handling
148+
// but lock outside to get proper handling of the queue shutdown
127149
c.mu.Lock()
128150

129-
// TODO(pwittrock): Reconsider HandleCrash
130-
defer utilruntime.HandleCrash()
131-
defer c.Queue.ShutDown()
151+
c.Queue = c.MakeQueue()
152+
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
132153

133-
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
134-
log.Info("Starting Controller", "controller", c.Name)
154+
err := func() error {
155+
defer c.mu.Unlock()
135156

136-
// Wait for the caches to be synced before starting workers
137-
if c.WaitForCacheSync == nil {
138-
c.WaitForCacheSync = c.Cache.WaitForCacheSync
139-
}
140-
if ok := c.WaitForCacheSync(stop); !ok {
141-
// This code is unreachable right now since WaitForCacheSync will never return an error
142-
// Leaving it here because that could happen in the future
143-
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
144-
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
145-
c.mu.Unlock()
146-
return err
147-
}
157+
// TODO(pwittrock): Reconsider HandleCrash
158+
defer utilruntime.HandleCrash()
148159

149-
if c.JitterPeriod == 0 {
150-
c.JitterPeriod = 1 * time.Second
151-
}
160+
// NB(directxman12): launch the sources *before* trying to wait for the
161+
// caches to sync so that they have a chance to register their intendeded
162+
// caches.
163+
for _, watch := range c.watches {
164+
log.Info("Starting EventSource", "controller", c.Name, "source", watch.src)
165+
if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil {
166+
return err
167+
}
168+
}
152169

153-
// Launch workers to process resources
154-
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
155-
for i := 0; i < c.MaxConcurrentReconciles; i++ {
156-
// Process work items
157-
go wait.Until(c.worker, c.JitterPeriod, stop)
158-
}
170+
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
171+
log.Info("Starting Controller", "controller", c.Name)
172+
173+
// Wait for the caches to be synced before starting workers
174+
if c.WaitForCacheSync == nil {
175+
c.WaitForCacheSync = c.Cache.WaitForCacheSync
176+
}
177+
if ok := c.WaitForCacheSync(stop); !ok {
178+
// This code is unreachable right now since WaitForCacheSync will never return an error
179+
// Leaving it here because that could happen in the future
180+
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
181+
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
182+
return err
183+
}
184+
185+
if c.JitterPeriod == 0 {
186+
c.JitterPeriod = 1 * time.Second
187+
}
188+
189+
// Launch workers to process resources
190+
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
191+
for i := 0; i < c.MaxConcurrentReconciles; i++ {
192+
// Process work items
193+
go wait.Until(c.worker, c.JitterPeriod, stop)
194+
}
159195

160-
c.Started = true
161-
c.mu.Unlock()
196+
c.Started = true
197+
return nil
198+
}()
199+
if err != nil {
200+
return err
201+
}
162202

163203
<-stop
164204
log.Info("Stopping workers", "controller", c.Name)

0 commit comments

Comments
 (0)