Skip to content

Commit 720d5f9

Browse files
committed
fix nil stop value for source.Channel
fixes #103 Creates a stop channel for the manager in New(), which will get passed to any source.Channel instances that are added. When the manager's start method is called and a new stop channel is passed in, that channel will be joined in a goroutine with the manager's existing channel so that if the newer channel gets closed, so will the manager's.
1 parent 0f4719b commit 720d5f9

File tree

3 files changed

+54
-40
lines changed

3 files changed

+54
-40
lines changed

pkg/manager/internal.go

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ type controllerManager struct {
7575
errChan chan error
7676
stop <-chan struct{}
7777

78+
// stopper is the write side of the stop channel. They should have the same value.
79+
stopper chan<- struct{}
80+
7881
startCache func(stop <-chan struct{}) error
7982
}
8083

@@ -159,9 +162,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
159162

160163
func (cm *controllerManager) Start(stop <-chan struct{}) error {
161164
if cm.resourceLock == nil {
162-
go cm.start(stop)
165+
// join the passed-in stop channel as an upstream feeding into cm.stopper
166+
go func() {
167+
<-stop
168+
close(cm.stopper)
169+
}()
170+
171+
go cm.start()
163172
select {
164-
case <-stop:
173+
case <-cm.stop:
165174
// we are done
166175
return nil
167176
case err := <-cm.errChan:
@@ -178,7 +187,19 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
178187
RenewDeadline: 10 * time.Second,
179188
RetryPeriod: 2 * time.Second,
180189
Callbacks: leaderelection.LeaderCallbacks{
181-
OnStartedLeading: cm.start,
190+
// This type changes in k8s 1.12 to func(context.Context)
191+
OnStartedLeading: func(stopleading <-chan struct{}) {
192+
// join both stop and stopleading so they feed into cm.stopper
193+
go func() {
194+
select {
195+
case <-stop:
196+
close(cm.stopper)
197+
case <-stopleading:
198+
close(cm.stopper)
199+
}
200+
}()
201+
cm.start()
202+
},
182203
OnStoppedLeading: func() {
183204
// Most implementations of leader election log.Fatal() here.
184205
// Since Start is wrapped in log.Fatal when called, we can just return
@@ -194,7 +215,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
194215
go l.Run()
195216

196217
select {
197-
case <-stop:
218+
case <-cm.stop:
198219
// We are done
199220
return nil
200221
case err := <-cm.errChan:
@@ -203,43 +224,33 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
203224
}
204225
}
205226

206-
func (cm *controllerManager) start(stop <-chan struct{}) {
207-
func() {
208-
cm.mu.Lock()
209-
defer cm.mu.Unlock()
210-
211-
cm.stop = stop
212-
213-
// Start the Cache. Allow the function to start the cache to be mocked out for testing
214-
if cm.startCache == nil {
215-
cm.startCache = cm.cache.Start
216-
}
217-
go func() {
218-
if err := cm.startCache(stop); err != nil {
219-
cm.errChan <- err
220-
}
221-
}()
227+
func (cm *controllerManager) start() {
228+
cm.mu.Lock()
229+
defer cm.mu.Unlock()
222230

223-
// Wait for the caches to sync.
224-
// TODO(community): Check the return value and write a test
225-
cm.cache.WaitForCacheSync(stop)
226-
227-
// Start the runnables after the cache has synced
228-
for _, c := range cm.runnables {
229-
// Controllers block, but we want to return an error if any have an error starting.
230-
// Write any Start errors to a channel so we can return them
231-
ctrl := c
232-
go func() {
233-
cm.errChan <- ctrl.Start(stop)
234-
}()
231+
// Start the Cache. Allow the function to start the cache to be mocked out for testing
232+
if cm.startCache == nil {
233+
cm.startCache = cm.cache.Start
234+
}
235+
go func() {
236+
if err := cm.startCache(cm.stop); err != nil {
237+
cm.errChan <- err
235238
}
236-
237-
cm.started = true
238239
}()
239240

240-
select {
241-
case <-stop:
242-
// We are done
243-
return
241+
// Wait for the caches to sync.
242+
// TODO(community): Check the return value and write a test
243+
cm.cache.WaitForCacheSync(cm.stop)
244+
245+
// Start the runnables after the cache has synced
246+
for _, c := range cm.runnables {
247+
// Controllers block, but we want to return an error if any have an error starting.
248+
// Write any Start errors to a channel so we can return them
249+
ctrl := c
250+
go func() {
251+
cm.errChan <- ctrl.Start(cm.stop)
252+
}()
244253
}
254+
255+
cm.started = true
245256
}

pkg/manager/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
180180
return nil, err
181181
}
182182

183+
stop := make(chan struct{})
184+
183185
return &controllerManager{
184186
config: config,
185187
scheme: options.Scheme,
@@ -191,6 +193,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
191193
recorderProvider: recorderProvider,
192194
resourceLock: resourceLock,
193195
mapper: mapper,
196+
stop: stop,
197+
stopper: stop,
194198
}, nil
195199
}
196200

pkg/manager/manager_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,7 @@ var _ = Describe("manger.Manager", func() {
327327
},
328328
stop: func(stop <-chan struct{}) error {
329329
defer GinkgoRecover()
330-
// Manager stop chan has not been initialized.
331-
Expect(stop).To(BeNil())
330+
Expect(stop).NotTo(BeNil())
332331
return nil
333332
},
334333
f: func(f inject.Func) error {

0 commit comments

Comments
 (0)