Skip to content

Commit bde0a5e

Browse files
committed
only allow starting a source once
Signed-off-by: Tim Ramlot <[email protected]>
1 parent a4e8aca commit bde0a5e

File tree

10 files changed

+372
-172
lines changed

10 files changed

+372
-172
lines changed

pkg/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {
7777

7878
ctx, cancel := context.WithCancel(context.Background())
7979
watchChan := make(chan event.GenericEvent, 1)
80-
watch := &source.Channel{Source: watchChan}
80+
watch := source.Channel(source.NewChannelBroadcaster(watchChan))
8181
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8282

8383
reconcileStarted := make(chan struct{})

pkg/internal/controller/controller_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ var _ = Describe("controller", func() {
226226
Object: p,
227227
}
228228

229-
ins := &source.Channel{Source: ch}
230-
ins.DestBufferSize = 1
229+
ins := source.Channel(source.NewChannelBroadcaster(ch), source.WithDestBufferSize(1))
231230

232231
// send the event to the channel
233232
ch <- evt
@@ -249,18 +248,18 @@ var _ = Describe("controller", func() {
249248
<-processed
250249
})
251250

252-
It("should error when channel source is not specified", func() {
251+
It("should error when ChannelBroadcaster is not specified", func() {
253252
ctx, cancel := context.WithCancel(context.Background())
254253
defer cancel()
255254

256-
ins := &source.Channel{}
255+
ins := source.Channel(nil)
257256
ctrl.startWatches = []watchDescription{{
258257
src: ins,
259258
}}
260259

261260
e := ctrl.Start(ctx)
262261
Expect(e).To(HaveOccurred())
263-
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
262+
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster"))
264263
})
265264

266265
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {

pkg/source/example_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ func ExampleChannel() {
4343
events := make(chan event.GenericEvent)
4444

4545
err := ctrl.Watch(
46-
&source.Channel{Source: events},
46+
source.Channel(
47+
source.NewChannelBroadcaster(events),
48+
),
4749
&handler.EnqueueRequestForObject{},
4850
)
4951
if err != nil {

pkg/source/internal/channel.go

Lines changed: 55 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,25 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/predicate"
2828
)
2929

30-
const (
31-
// defaultBufferSize is the default number of event notifications that can be buffered.
32-
defaultBufferSize = 1024
33-
)
30+
// ChannelOptions contains the options for the Channel source.
31+
type ChannelOptions struct {
32+
// DestBufferSize is the specified buffer size of dest channels.
33+
// Default to 1024 if not specified.
34+
DestBufferSize int
35+
}
3436

3537
// Channel is used to provide a source of events originating outside the cluster
3638
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
3739
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
3840
type Channel struct {
39-
// once ensures the event distribution goroutine will be performed only once
40-
once sync.Once
41-
42-
// Source is the source channel to fetch GenericEvents
43-
Source <-chan event.GenericEvent
41+
Options ChannelOptions
4442

45-
// dest is the destination channels of the added event handlers
46-
dest []chan event.GenericEvent
43+
// Broadcaster contains the source channel for events.
44+
Broadcaster *ChannelBroadcaster
4745

48-
// DestBufferSize is the specified buffer size of dest channels.
49-
// Default to 1024 if not specified.
50-
DestBufferSize int
51-
52-
// destLock is to ensure the destination channels are safely added/removed
53-
destLock sync.Mutex
46+
mu sync.Mutex
47+
// isStarted is true if the source has been started. A source can only be started once.
48+
isStarted bool
5449
}
5550

5651
func (cs *Channel) String() string {
@@ -63,88 +58,67 @@ func (cs *Channel) Start(
6358
handler handler.EventHandler,
6459
queue workqueue.RateLimitingInterface,
6560
prct ...predicate.Predicate) error {
66-
// Source should have been specified by the user.
67-
if cs.Source == nil {
68-
return fmt.Errorf("must specify Channel.Source")
61+
// Broadcaster should have been specified by the user.
62+
if cs.Broadcaster == nil {
63+
return fmt.Errorf("must create Channel with a non-nil Broadcaster")
6964
}
7065

71-
// use default value if DestBufferSize not specified
72-
if cs.DestBufferSize == 0 {
73-
cs.DestBufferSize = defaultBufferSize
66+
cs.mu.Lock()
67+
defer cs.mu.Unlock()
68+
if cs.isStarted {
69+
return fmt.Errorf("cannot start an already started Channel source")
7470
}
71+
cs.isStarted = true
7572

76-
dst := make(chan event.GenericEvent, cs.DestBufferSize)
77-
78-
cs.destLock.Lock()
79-
cs.dest = append(cs.dest, dst)
80-
cs.destLock.Unlock()
81-
82-
cs.once.Do(func() {
83-
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
84-
go cs.syncLoop(ctx)
85-
})
73+
// Create a destination channel for the event handler
74+
// and add it to the list of destinations
75+
destination := make(chan event.GenericEvent, cs.Options.DestBufferSize)
76+
cs.Broadcaster.AddListener(destination)
8677

8778
go func() {
88-
for evt := range dst {
89-
shouldHandle := true
90-
for _, p := range prct {
91-
if !p.Generic(evt) {
92-
shouldHandle = false
93-
break
94-
}
95-
}
96-
97-
if shouldHandle {
98-
func() {
99-
ctx, cancel := context.WithCancel(ctx)
100-
defer cancel()
101-
handler.Generic(ctx, evt, queue)
102-
}()
103-
}
104-
}
79+
// Remove the listener and wait for the broadcaster
80+
// to stop sending events to the destination channel.
81+
defer cs.Broadcaster.RemoveListener(destination)
82+
83+
cs.processReceivedEvents(
84+
ctx,
85+
destination,
86+
queue,
87+
handler,
88+
prct,
89+
)
10590
}()
10691

10792
return nil
10893
}
10994

110-
func (cs *Channel) doStop() {
111-
cs.destLock.Lock()
112-
defer cs.destLock.Unlock()
113-
114-
for _, dst := range cs.dest {
115-
close(dst)
116-
}
117-
}
118-
119-
func (cs *Channel) distribute(evt event.GenericEvent) {
120-
cs.destLock.Lock()
121-
defer cs.destLock.Unlock()
122-
123-
for _, dst := range cs.dest {
124-
// We cannot make it under goroutine here, or we'll meet the
125-
// race condition of writing message to closed channels.
126-
// To avoid blocking, the dest channels are expected to be of
127-
// proper buffer size. If we still see it blocked, then
128-
// the controller is thought to be in an abnormal state.
129-
dst <- evt
130-
}
131-
}
132-
133-
func (cs *Channel) syncLoop(ctx context.Context) {
95+
func (cs *Channel) processReceivedEvents(
96+
ctx context.Context,
97+
destination <-chan event.GenericEvent,
98+
queue workqueue.RateLimitingInterface,
99+
eventHandler handler.EventHandler,
100+
predicates []predicate.Predicate,
101+
) {
102+
eventloop:
134103
for {
135104
select {
136105
case <-ctx.Done():
137-
// Close destination channels
138-
cs.doStop()
139106
return
140-
case evt, stillOpen := <-cs.Source:
107+
case event, stillOpen := <-destination:
141108
if !stillOpen {
142-
// if the source channel is closed, we're never gonna get
143-
// anything more on it, so stop & bail
144-
cs.doStop()
145109
return
146110
}
147-
cs.distribute(evt)
111+
112+
// Check predicates against the event first
113+
// and continue the outer loop if any of them fail.
114+
for _, p := range predicates {
115+
if !p.Generic(event) {
116+
continue eventloop
117+
}
118+
}
119+
120+
// Call the event handler with the event.
121+
eventHandler.Generic(ctx, event, queue)
148122
}
149123
}
150124
}

0 commit comments

Comments
 (0)