Skip to content

Commit 6fcaa42

Browse files
committed
only allow starting a source once
Signed-off-by: Tim Ramlot <[email protected]>
1 parent 22f4a15 commit 6fcaa42

File tree

9 files changed

+302
-102
lines changed

9 files changed

+302
-102
lines changed

pkg/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {
7979

8080
ctx, cancel := context.WithCancel(context.Background())
8181
watchChan := make(chan event.GenericEvent, 1)
82-
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
82+
watch := source.Channel(source.NewChannelBroadcaster(watchChan), &handler.EnqueueRequestForObject{})
8383
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8484

8585
reconcileStarted := make(chan struct{})

pkg/internal/controller/controller_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ var _ = Describe("controller", func() {
227227
}
228228

229229
ins := source.Channel(
230-
ch,
230+
source.NewChannelBroadcaster(ch),
231231
handler.Funcs{
232232
GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
233233
defer GinkgoRecover()
@@ -248,7 +248,7 @@ var _ = Describe("controller", func() {
248248
<-processed
249249
})
250250

251-
It("should error when channel source is not specified", func() {
251+
It("should error when ChannelBroadcaster is not specified", func() {
252252
ctx, cancel := context.WithCancel(context.Background())
253253
defer cancel()
254254

@@ -257,7 +257,7 @@ var _ = Describe("controller", func() {
257257

258258
e := ctrl.Start(ctx)
259259
Expect(e).To(HaveOccurred())
260-
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
260+
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster"))
261261
})
262262

263263
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {

pkg/source/example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func ExampleChannel() {
4444

4545
err := ctrl.Watch(
4646
source.Channel(
47-
events,
47+
source.NewChannelBroadcaster(events),
4848
&handler.EnqueueRequestForObject{},
4949
),
5050
)

pkg/source/internal/channel.go

Lines changed: 52 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"sync"
2424

2525
"k8s.io/client-go/util/workqueue"
26-
"k8s.io/utils/ptr"
2726
"sigs.k8s.io/controller-runtime/pkg/event"
2827
"sigs.k8s.io/controller-runtime/pkg/handler"
2928
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -33,23 +32,18 @@ import (
3332
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
3433
// source (e.g. http handler) to write GenericEvents to the underlying channel.
3534
type Channel[T any] struct {
36-
// once ensures the event distribution goroutine will be performed only once
37-
once sync.Once
38-
39-
// source is the source channel to fetch GenericEvents
40-
Source <-chan event.TypedGenericEvent[T]
35+
// Broadcaster contains the source channel for events.
36+
Broadcaster *ChannelBroadcaster[T]
4137

4238
Handler handler.TypedEventHandler[T]
4339

4440
Predicates []predicate.TypedPredicate[T]
4541

46-
BufferSize *int
47-
48-
// dest is the destination channels of the added event handlers
49-
dest []chan event.TypedGenericEvent[T]
42+
DestBufferSize int
5043

51-
// destLock is to ensure the destination channels are safely added/removed
52-
destLock sync.Mutex
44+
mu sync.Mutex
45+
// isStarted is true if the source has been started. A source can only be started once.
46+
isStarted bool
5347
}
5448

5549
func (cs *Channel[T]) String() string {
@@ -62,89 +56,72 @@ func (cs *Channel[T]) Start(
6256
queue workqueue.RateLimitingInterface,
6357
) error {
6458
// Source should have been specified by the user.
65-
if cs.Source == nil {
66-
return fmt.Errorf("must specify Channel.Source")
59+
if cs.Broadcaster == nil {
60+
return fmt.Errorf("must create Channel with a non-nil Broadcaster")
6761
}
6862
if cs.Handler == nil {
69-
return errors.New("must specify Channel.Handler")
63+
return errors.New("must create Channel with a non-nil Handler")
7064
}
71-
72-
if cs.BufferSize == nil {
73-
cs.BufferSize = ptr.To(1024)
65+
if cs.DestBufferSize == 0 {
66+
return errors.New("must create Channel with a >0 DestBufferSize")
7467
}
7568

76-
dst := make(chan event.TypedGenericEvent[T], *cs.BufferSize)
77-
78-
cs.destLock.Lock()
79-
cs.dest = append(cs.dest, dst)
80-
cs.destLock.Unlock()
69+
cs.mu.Lock()
70+
defer cs.mu.Unlock()
71+
if cs.isStarted {
72+
return fmt.Errorf("cannot start an already started Channel source")
73+
}
74+
cs.isStarted = true
8175

82-
cs.once.Do(func() {
83-
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
84-
go cs.syncLoop(ctx)
85-
})
76+
// Create a destination channel for the event handler
77+
// and add it to the list of destinations
78+
destination := make(chan event.TypedGenericEvent[T], cs.DestBufferSize)
79+
cs.Broadcaster.AddListener(destination)
8680

8781
go func() {
88-
for evt := range dst {
89-
shouldHandle := true
90-
for _, p := range cs.Predicates {
91-
if !p.Generic(evt) {
92-
shouldHandle = false
93-
break
94-
}
95-
}
96-
97-
if shouldHandle {
98-
func() {
99-
ctx, cancel := context.WithCancel(ctx)
100-
defer cancel()
101-
cs.Handler.Generic(ctx, evt, queue)
102-
}()
103-
}
104-
}
82+
// Remove the listener and wait for the broadcaster
83+
// to stop sending events to the destination channel.
84+
defer cs.Broadcaster.RemoveListener(destination)
85+
86+
cs.processReceivedEvents(
87+
ctx,
88+
destination,
89+
queue,
90+
cs.Handler,
91+
cs.Predicates,
92+
)
10593
}()
10694

10795
return nil
10896
}
10997

110-
func (cs *Channel[T]) doStop() {
111-
cs.destLock.Lock()
112-
defer cs.destLock.Unlock()
113-
114-
for _, dst := range cs.dest {
115-
close(dst)
116-
}
117-
}
118-
119-
func (cs *Channel[T]) distribute(evt event.TypedGenericEvent[T]) {
120-
cs.destLock.Lock()
121-
defer cs.destLock.Unlock()
122-
123-
for _, dst := range cs.dest {
124-
// We cannot make it under goroutine here, or we'll meet the
125-
// race condition of writing message to closed channels.
126-
// To avoid blocking, the dest channels are expected to be of
127-
// proper buffer size. If we still see it blocked, then
128-
// the controller is thought to be in an abnormal state.
129-
dst <- evt
130-
}
131-
}
132-
133-
func (cs *Channel[T]) syncLoop(ctx context.Context) {
98+
func (cs *Channel[T]) processReceivedEvents(
99+
ctx context.Context,
100+
destination <-chan event.TypedGenericEvent[T],
101+
queue workqueue.RateLimitingInterface,
102+
eventHandler handler.TypedEventHandler[T],
103+
predicates []predicate.TypedPredicate[T],
104+
) {
105+
eventloop:
134106
for {
135107
select {
136108
case <-ctx.Done():
137-
// Close destination channels
138-
cs.doStop()
139109
return
140-
case evt, stillOpen := <-cs.Source:
110+
case event, stillOpen := <-destination:
141111
if !stillOpen {
142-
// if the source channel is closed, we're never gonna get
143-
// anything more on it, so stop & bail
144-
cs.doStop()
145112
return
146113
}
147-
cs.distribute(evt)
114+
115+
// Check predicates against the event first
116+
// and continue the outer loop if any of them fail.
117+
for _, p := range predicates {
118+
if !p.Generic(event) {
119+
continue eventloop
120+
}
121+
}
122+
123+
// Call the event handler with the event.
124+
eventHandler.Generic(ctx, event, queue)
148125
}
149126
}
150127
}

0 commit comments

Comments
 (0)