Skip to content

Commit 5a56e55

Browse files
committed
make sure that Start and Stop can be used concurrently
Signed-off-by: Tim Ramlot <[email protected]>
1 parent 2ceb950 commit 5a56e55

File tree

4 files changed

+116
-46
lines changed

4 files changed

+116
-46
lines changed

pkg/internal/source/informer.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
5454

5555
is.mu.Lock()
5656
defer is.mu.Unlock()
57-
if is.canceled {
58-
return nil
57+
if is.started != nil {
58+
return fmt.Errorf("Informer source already started")
5959
}
6060

61-
if is.started != nil {
62-
return errors.New("already started")
61+
if is.canceled {
62+
return nil
6363
}
6464

6565
is.started = make(chan error, 1)
@@ -84,11 +84,11 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
8484
is.informer = informer
8585

8686
if is.WaitForInformerSync == nil {
87-
is.WaitForInformerSync = func(ctx context.Context) error {
87+
is.WaitForInformerSync = func(informerSyncCtx context.Context) error {
8888
return wait.PollImmediateUntil(
8989
syncedPollPeriod,
9090
func() (bool, error) { return is.informer.HasSynced(), nil },
91-
ctx.Done(),
91+
informerSyncCtx.Done(),
9292
)
9393
}
9494
}
@@ -161,8 +161,11 @@ func (is *Informer) Stop() error {
161161
case <-is.started:
162162
default:
163163
is.startCancel()
164+
165+
is.mu.Unlock()
164166
// Wait for starting abort
165167
<-is.started
168+
is.mu.Lock()
166169
}
167170

168171
if is.eventHandlerRegistration != nil {

pkg/internal/source/kind.go

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,49 +40,58 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
4040
return fmt.Errorf("must create Kind with a non-nil cache")
4141
}
4242

43-
ks.informer = Informer{
44-
GetInformer: func(ctx context.Context) (cache.Informer, error) {
45-
var (
46-
informer cache.Informer
47-
lastErr error
48-
)
49-
50-
// Tries to get an informer until it returns true,
51-
// an error or the specified context is cancelled or expired.
52-
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
53-
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
54-
informer, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
55-
if lastErr != nil {
56-
kindMatchErr := &meta.NoKindMatchError{}
57-
switch {
58-
case errors.As(lastErr, &kindMatchErr):
59-
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
60-
"kind", kindMatchErr.GroupKind)
61-
case runtime.IsNotRegisteredError(lastErr):
62-
log.Error(lastErr, "kind must be registered to the Scheme")
63-
default:
64-
log.Error(lastErr, "failed to get informer from cache")
65-
}
66-
return false, nil // Retry.
67-
}
68-
return true, nil
69-
}); err != nil {
70-
if lastErr != nil {
71-
return nil, fmt.Errorf("failed to get informer from cache: %w", lastErr)
43+
getInformer := func(ctx context.Context) (cache.Informer, error) {
44+
var (
45+
informer cache.Informer
46+
lastErr error
47+
)
48+
49+
// Tries to get an informer until it returns true,
50+
// an error or the specified context is cancelled or expired.
51+
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
52+
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
53+
informer, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
54+
if lastErr != nil {
55+
kindMatchErr := &meta.NoKindMatchError{}
56+
switch {
57+
case errors.As(lastErr, &kindMatchErr):
58+
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
59+
"kind", kindMatchErr.GroupKind)
60+
case runtime.IsNotRegisteredError(lastErr):
61+
log.Error(lastErr, "kind must be registered to the Scheme")
62+
default:
63+
log.Error(lastErr, "failed to get informer from cache")
7264
}
73-
return nil, err
65+
return false, nil // Retry.
7466
}
75-
76-
return informer, nil
77-
},
78-
WaitForInformerSync: func(ctx context.Context) error {
79-
if ks.Cache.WaitForCacheSync(ctx) {
80-
return nil
67+
return true, nil
68+
}); err != nil {
69+
if lastErr != nil {
70+
return nil, fmt.Errorf("failed to get informer from cache: %w", lastErr)
8171
}
72+
return nil, err
73+
}
74+
75+
return informer, nil
76+
}
77+
78+
waitForInformerSync := func(ctx context.Context) error {
79+
if ks.Cache.WaitForCacheSync(ctx) {
80+
return nil
81+
}
82+
83+
return fmt.Errorf("cache did not sync")
84+
}
8285

83-
return fmt.Errorf("cache did not sync")
84-
},
86+
ks.informer.mu.Lock()
87+
// If the GetInformer function is set, the informer has already been started.
88+
if ks.informer.GetInformer != nil || ks.informer.WaitForInformerSync != nil {
89+
ks.informer.mu.Unlock()
90+
return fmt.Errorf("Kind source already started")
8591
}
92+
ks.informer.GetInformer = getInformer
93+
ks.informer.WaitForInformerSync = waitForInformerSync
94+
ks.informer.mu.Unlock()
8695

8796
return ks.informer.Start(ctx, handler, queue, prct...)
8897
}

pkg/source/source.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ func (cs *Channel) Start(ctx context.Context, handler handler.EventHandler, queu
128128
dst := make(chan event.GenericEvent, cs.DestBufferSize)
129129

130130
cs.mu.Lock()
131+
if cs.canceled {
132+
cs.mu.Unlock()
133+
return nil
134+
}
135+
131136
cs.destinations = append(cs.destinations, dst)
132137

133138
if cs.groupWait == nil {
@@ -208,6 +213,9 @@ func (cs *Channel) loop(ctx context.Context) {
208213
// Stop is internal and should be called only by the Controller to stop the Source. It should block until the
209214
// Source has stopped.
210215
func (cs *Channel) Stop() error {
216+
groupCancel := func() {}
217+
groupWait := func() error { return nil }
218+
211219
if func() bool {
212220
cs.mu.Lock()
213221
defer cs.mu.Unlock()
@@ -217,13 +225,18 @@ func (cs *Channel) Stop() error {
217225
}
218226
cs.canceled = true
219227

228+
if cs.groupCancel != nil {
229+
groupCancel = cs.groupCancel
230+
groupWait = cs.groupWait
231+
}
232+
220233
return false
221234
}() {
222235
return nil
223236
}
224237

225-
cs.groupCancel()
226-
return cs.groupWait()
238+
groupCancel()
239+
return groupWait()
227240
}
228241

229242
var _ Source = Func(nil)

pkg/source/source_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,27 @@ var _ = Describe("Source", func() {
226226
Expect(err.Error()).To(Equal("cache did not sync"))
227227

228228
})
229+
230+
Context("should not panic", func() {
231+
It("when Stop is called before Start", func() {
232+
instance := source.Kind(ic, &corev1.Pod{})
233+
err := instance.Stop()
234+
Expect(err).NotTo(HaveOccurred())
235+
})
236+
237+
It("when Start is called twice", func() {
238+
instance := source.Kind(ic, &corev1.Pod{})
239+
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
240+
Expect(instance.Start(ctx, nil, nil)).To(MatchError("Kind source already started"))
241+
})
242+
243+
It("when Stop is called twice", func() {
244+
instance := source.Kind(ic, &corev1.Pod{})
245+
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
246+
Expect(instance.Stop()).To(Succeed())
247+
Expect(instance.Stop()).To(Succeed())
248+
})
249+
})
229250
})
230251

231252
Describe("Func", func() {
@@ -528,5 +549,29 @@ var _ = Describe("Source", func() {
528549
Expect(resEvent1).To(Equal(resEvent2))
529550
})
530551
})
552+
553+
Context("should not panic", func() {
554+
It("when Stop is called before Start", func() {
555+
ch := make(chan event.GenericEvent)
556+
instance := &source.Channel{Source: ch}
557+
err := instance.Stop()
558+
Expect(err).NotTo(HaveOccurred())
559+
})
560+
561+
It("when Start is called twice", func() {
562+
ch := make(chan event.GenericEvent)
563+
instance := &source.Channel{Source: ch}
564+
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
565+
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
566+
})
567+
568+
It("when Stop is called twice", func() {
569+
ch := make(chan event.GenericEvent)
570+
instance := &source.Channel{Source: ch}
571+
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
572+
Expect(instance.Stop()).To(Succeed())
573+
Expect(instance.Stop()).To(Succeed())
574+
})
575+
})
531576
})
532577
})

0 commit comments

Comments
 (0)