Skip to content

Commit 01b8e5b

Browse files
committed
only allow starting a source once
Signed-off-by: Tim Ramlot <[email protected]>
1 parent a9db208 commit 01b8e5b

File tree

6 files changed

+288
-92
lines changed

6 files changed

+288
-92
lines changed

pkg/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {
7979

8080
ctx, cancel := context.WithCancel(context.Background())
8181
watchChan := make(chan event.GenericEvent, 1)
82-
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
82+
watch := source.Channel(source.NewChannelBroadcaster(watchChan), &handler.EnqueueRequestForObject{})
8383
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8484

8585
reconcileStarted := make(chan struct{})

pkg/internal/controller/controller_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ var _ = Describe("controller", func() {
227227
}
228228

229229
ins := source.Channel(
230-
ch,
230+
source.NewChannelBroadcaster(ch),
231231
handler.Funcs{
232232
GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
233233
defer GinkgoRecover()
@@ -248,7 +248,7 @@ var _ = Describe("controller", func() {
248248
<-processed
249249
})
250250

251-
It("should error when channel source is not specified", func() {
251+
It("should error when ChannelBroadcaster is not specified", func() {
252252
ctx, cancel := context.WithCancel(context.Background())
253253
defer cancel()
254254

@@ -257,7 +257,7 @@ var _ = Describe("controller", func() {
257257

258258
e := ctrl.Start(ctx)
259259
Expect(e).To(HaveOccurred())
260-
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
260+
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil broadcaster"))
261261
})
262262

263263
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {

pkg/internal/source/kind.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,27 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package internal
218

319
import (
420
"context"
521
"errors"
622
"fmt"
723
"reflect"
24+
"sync"
825
"time"
926

1027
"k8s.io/apimachinery/pkg/api/meta"
@@ -30,6 +47,9 @@ type Kind[T client.Object] struct {
3047

3148
Predicates []predicate.TypedPredicate[T]
3249

50+
mu sync.RWMutex
51+
isStarted bool
52+
3353
// startedErr may contain an error if one was encountered during startup. If its closed and does not
3454
// contain an error, startup and syncing finished.
3555
startedErr chan error
@@ -40,14 +60,21 @@ type Kind[T client.Object] struct {
4060
// to enqueue reconcile.Requests.
4161
func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
4262
if isNil(ks.Type) {
43-
return fmt.Errorf("must create Kind with a non-nil object")
63+
return fmt.Errorf("must create Kind with a non-nil type")
4464
}
4565
if isNil(ks.Cache) {
4666
return fmt.Errorf("must create Kind with a non-nil cache")
4767
}
4868
if isNil(ks.Handler) {
49-
return errors.New("must create Kind with non-nil handler")
69+
return errors.New("must create Kind with a non-nil handler")
70+
}
71+
72+
ks.mu.Lock()
73+
defer ks.mu.Unlock()
74+
if ks.isStarted {
75+
return fmt.Errorf("cannot start an already started Kind source")
5076
}
77+
ks.isStarted = true
5178

5279
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
5380
// sync that informer (most commonly due to RBAC issues).

pkg/source/example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func ExampleChannel() {
4444

4545
err := ctrl.Watch(
4646
source.Channel(
47-
events,
47+
source.NewChannelBroadcaster(events),
4848
&handler.EnqueueRequestForObject{},
4949
),
5050
)

0 commit comments

Comments
 (0)