Skip to content

Commit f9c3e7c

Browse files
inteonFillZpp
andcommitted
Support shutdown controllers and watches dynamically
Co-authored-by: FillZpp <[email protected]> Signed-off-by: Tim Ramlot <[email protected]>
1 parent 5c0aa32 commit f9c3e7c

File tree

9 files changed

+581
-176
lines changed

9 files changed

+581
-176
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/prometheus/client_model v0.3.0
1515
go.uber.org/goleak v1.2.0
1616
go.uber.org/zap v1.24.0
17+
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
1718
golang.org/x/sys v0.4.0
1819
golang.org/x/time v0.3.0
1920
gomodules.xyz/jsonpatch/v2 v2.2.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
371371
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
372372
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
373373
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
374+
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
375+
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
374376
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
375377
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
376378
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

pkg/controller/controllertest/util.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package controllertest
1818

1919
import (
20+
"fmt"
21+
"sync"
2022
"time"
2123

2224
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -33,7 +35,12 @@ type FakeInformer struct {
3335
// RunCount is incremented each time RunInformersAndControllers is called
3436
RunCount int
3537

36-
handlers []cache.ResourceEventHandler
38+
handlers []*listenHandler
39+
mu sync.RWMutex
40+
}
41+
42+
type listenHandler struct {
43+
cache.ResourceEventHandler
3744
}
3845

3946
// AddIndexers does nothing. TODO(community): Implement this.
@@ -58,8 +65,11 @@ func (f *FakeInformer) HasSynced() bool {
5865

5966
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
6067
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
61-
f.handlers = append(f.handlers, handler)
62-
return nil, nil
68+
f.mu.Lock()
69+
defer f.mu.Unlock()
70+
lh := &listenHandler{ResourceEventHandler: handler}
71+
f.handlers = append(f.handlers, lh)
72+
return lh, nil
6373
}
6474

6575
// Run implements the Informer interface. Increments f.RunCount.
@@ -69,20 +79,26 @@ func (f *FakeInformer) Run(<-chan struct{}) {
6979

7080
// Add fakes an Add event for obj.
7181
func (f *FakeInformer) Add(obj metav1.Object) {
82+
f.mu.RLock()
83+
defer f.mu.RUnlock()
7284
for _, h := range f.handlers {
7385
h.OnAdd(obj)
7486
}
7587
}
7688

7789
// Update fakes an Update event for obj.
7890
func (f *FakeInformer) Update(oldObj, newObj metav1.Object) {
91+
f.mu.RLock()
92+
defer f.mu.RUnlock()
7993
for _, h := range f.handlers {
8094
h.OnUpdate(oldObj, newObj)
8195
}
8296
}
8397

8498
// Delete fakes an Delete event for obj.
8599
func (f *FakeInformer) Delete(obj metav1.Object) {
100+
f.mu.RLock()
101+
defer f.mu.RUnlock()
86102
for _, h := range f.handlers {
87103
h.OnDelete(obj)
88104
}
@@ -95,6 +111,21 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve
95111

96112
// RemoveEventHandler does nothing. TODO(community): Implement this.
97113
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
114+
lh, ok := handle.(*listenHandler)
115+
if !ok {
116+
return fmt.Errorf("invalid key type %t", handle)
117+
}
118+
119+
f.mu.Lock()
120+
defer f.mu.Unlock()
121+
handlers := make([]*listenHandler, 0, len(f.handlers))
122+
for _, h := range f.handlers {
123+
if h == lh {
124+
continue
125+
}
126+
handlers = append(handlers, h)
127+
}
128+
f.handlers = handlers
98129
return nil
99130
}
100131

pkg/internal/controller/controller.go

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package controller
1818

1919
import (
2020
"context"
21-
"errors"
2221
"fmt"
2322
"sync"
2423
"time"
@@ -58,12 +57,18 @@ type Controller struct {
5857
// the Queue for processing
5958
Queue workqueue.RateLimitingInterface
6059

60+
// startedSources maintains a list of sources that have already started.
61+
startedSources []source.Source
62+
6163
// mu is used to synchronize Controller setup
6264
mu sync.Mutex
6365

6466
// Started is true if the Controller has been Started
6567
Started bool
6668

69+
// Stopped is true if the Controller has been Stopped
70+
Stopped bool
71+
6772
// ctx is the context that was passed to Start() and used when starting watches.
6873
//
6974
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
@@ -123,6 +128,10 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
123128
c.mu.Lock()
124129
defer c.mu.Unlock()
125130

131+
if c.Stopped {
132+
return fmt.Errorf("can not start watch in a stopped controller")
133+
}
134+
126135
// Controller hasn't started yet, store the watches locally and return.
127136
//
128137
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
@@ -132,7 +141,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
132141
}
133142

134143
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
135-
return src.Start(c.ctx, evthdler, c.Queue, prct...)
144+
err := src.Start(c.ctx, evthdler, c.Queue, prct...)
145+
if err == nil {
146+
c.startedSources = append(c.startedSources, src)
147+
}
148+
return err
136149
}
137150

138151
// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
@@ -148,23 +161,23 @@ func (c *Controller) Start(ctx context.Context) error {
148161
// use an IIFE to get proper lock handling
149162
// but lock outside to get proper handling of the queue shutdown
150163
c.mu.Lock()
164+
if c.Stopped {
165+
c.mu.Unlock()
166+
return fmt.Errorf("can not restart a stopped controller, you should create a new one")
167+
}
151168
if c.Started {
152-
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
169+
c.mu.Unlock()
170+
return fmt.Errorf("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
153171
}
154172

155173
c.initMetrics()
156174

157-
// Set the internal context.
175+
// Set the internal context, which is used to start watches after the controller is started.
158176
c.ctx = ctx
159177

160178
c.Queue = c.MakeQueue()
161-
go func() {
162-
<-ctx.Done()
163-
c.Queue.ShutDown()
164-
}()
165-
166179
wg := &sync.WaitGroup{}
167-
err := func() error {
180+
startErr := func() error {
168181
defer c.mu.Unlock()
169182

170183
// TODO(pwittrock): Reconsider HandleCrash
@@ -179,6 +192,7 @@ func (c *Controller) Start(ctx context.Context) error {
179192
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
180193
return err
181194
}
195+
c.startedSources = append(c.startedSources, watch.src)
182196
}
183197

184198
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
@@ -231,14 +245,31 @@ func (c *Controller) Start(ctx context.Context) error {
231245
c.Started = true
232246
return nil
233247
}()
234-
if err != nil {
235-
return err
248+
249+
defer func() {
250+
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
251+
c.Queue.ShutDown()
252+
wg.Wait()
253+
c.LogConstructor(nil).Info("All workers finished")
254+
255+
c.mu.Lock()
256+
defer c.mu.Unlock()
257+
c.Stopped = true
258+
259+
for _, src := range c.startedSources {
260+
if err := src.Stop(); err != nil {
261+
c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src)
262+
}
263+
}
264+
c.LogConstructor(nil).Info("All watch sources finished")
265+
}()
266+
267+
if startErr != nil {
268+
return startErr
236269
}
237270

238271
<-ctx.Done()
239-
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
240-
wg.Wait()
241-
c.LogConstructor(nil).Info("All workers finished")
272+
242273
return nil
243274
}
244275

pkg/internal/controller/controller_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,47 @@ var _ = Describe("controller", func() {
304304
It("should return an error if it gets started more than once", func() {
305305
// Use a cancelled context so Start doesn't block
306306
ctx, cancel := context.WithCancel(context.Background())
307-
cancel()
308-
Expect(ctrl.Start(ctx)).To(BeNil())
307+
started, done := make(chan struct{}), make(chan struct{})
308+
go func() {
309+
defer close(done)
310+
close(started)
311+
Expect(ctrl.Start(ctx)).To(BeNil())
312+
}()
313+
defer func() { <-done }()
314+
<-started
315+
defer cancel()
309316
err := ctrl.Start(ctx)
310317
Expect(err).NotTo(BeNil())
311318
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
312319
})
313320

321+
It("should return an error if it gets start after stopped", func() {
322+
stoppedChan := make(chan struct{})
323+
controllerCtx, controllerCancel := context.WithCancel(context.Background())
324+
go func() {
325+
Expect(ctrl.Start(controllerCtx)).To(BeNil())
326+
close(stoppedChan)
327+
}()
328+
329+
// wait for started
330+
var started bool
331+
for !started {
332+
func() {
333+
ctrl.mu.Lock()
334+
defer ctrl.mu.Unlock()
335+
started = ctrl.Started
336+
}()
337+
}
338+
339+
controllerCancel()
340+
<-stoppedChan
341+
Expect(ctrl.Stopped).To(Equal(true))
342+
343+
err := ctrl.Start(context.TODO())
344+
Expect(err).NotTo(BeNil())
345+
Expect(err.Error()).To(Equal("can not restart a stopped controller, you should create a new one"))
346+
})
347+
314348
})
315349

316350
Describe("Processing queue items from a Controller", func() {

0 commit comments

Comments
 (0)