Skip to content

Commit e844cac

Browse files
committed
implement the ChannelSource
1 parent 210edd2 commit e844cac

File tree

5 files changed

+324
-9
lines changed

5 files changed

+324
-9
lines changed

pkg/controller/manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ func (cm *controllerManager) injectInto(i interface{}) error {
146146
if _, err := inject.CacheInto(cm.cache, i); err != nil {
147147
return err
148148
}
149+
if _, err := inject.DoStop(cm.stop, i); err != nil {
150+
return err
151+
}
149152
return nil
150153
}
151154

pkg/controller/source/example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func ExampleChannelSource() {
4141
events := make(chan event.GenericEvent)
4242

4343
ctrl.Watch(
44-
source.ChannelSource(events),
44+
&source.ChannelSource{Source: events},
4545
&eventhandler.EnqueueHandler{},
4646
)
4747
}

pkg/controller/source/source.go

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2324
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
@@ -30,6 +31,11 @@ import (
3031
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/predicate"
3132
)
3233

34+
const (
35+
// defaultBufferSize is the default number of event notifications that can be buffered.
36+
defaultBufferSize = 1024
37+
)
38+
3339
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
3440
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
3541
//
@@ -45,21 +51,127 @@ type Source interface {
4551
Start(eventhandler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
4652
}
4753

54+
var _ Source = &ChannelSource{}
55+
4856
// ChannelSource is used to provide a source of events originating outside the cluster
4957
// (eh.g. GitHub Webhook callback). ChannelSource requires the user to wire the external
5058
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
51-
type ChannelSource chan event.GenericEvent
59+
type ChannelSource struct {
60+
// once ensures the event distribution goroutine will be performed only once
61+
once sync.Once
62+
63+
// Source is the source channel to fetch GenericEvents
64+
Source <-chan event.GenericEvent
65+
66+
// stop is to end ongoing goroutine, and close the channels
67+
stop <-chan struct{}
68+
69+
// dest is the destination channels of the added event handlers
70+
dest []chan event.GenericEvent
71+
72+
// DestBufferSize is the specified buffer size of dest channels
73+
DestBufferSize int
5274

53-
var _ Source = ChannelSource(make(chan event.GenericEvent))
75+
// destLock is to ensure the destination channels are safely added/removed
76+
destLock sync.Mutex
77+
}
78+
79+
var _ inject.Stop = &ChannelSource{}
80+
81+
// InjectStop is internal should be called only by the Controller.
82+
// It is used to inject the stop channel initialized by the ControllerManager.
83+
func (cs *ChannelSource) InjectStop(stop <-chan struct{}) error {
84+
if cs.stop == nil {
85+
cs.stop = stop
86+
}
87+
88+
return nil
89+
}
5490

5591
// Start implements Source and should only be called by the Controller.
56-
func (ks ChannelSource) Start(
92+
func (cs *ChannelSource) Start(
5793
handler eventhandler.EventHandler,
5894
queue workqueue.RateLimitingInterface,
5995
prct ...predicate.Predicate) error {
96+
// Source should have been specified by the user.
97+
if cs.Source == nil {
98+
return fmt.Errorf("must specify ChannelSource.Source")
99+
}
100+
101+
// stop should have been injected before Start was called
102+
if cs.stop == nil {
103+
return fmt.Errorf("must call InjectStop on ChannelSource before calling Start")
104+
}
105+
106+
// use default value if DestBufferSize not specified
107+
if cs.DestBufferSize == 0 {
108+
cs.DestBufferSize = defaultBufferSize
109+
}
110+
111+
cs.once.Do(func() {
112+
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
113+
go func() {
114+
for {
115+
select {
116+
case <-cs.stop:
117+
// Close destination channels
118+
cs.doStop()
119+
return
120+
case evt := <-cs.Source:
121+
cs.distribute(evt)
122+
}
123+
}
124+
}()
125+
})
126+
127+
dst := make(chan event.GenericEvent, cs.DestBufferSize)
128+
go func() {
129+
for evt := range dst {
130+
shouldHandle := true
131+
for _, p := range prct {
132+
if !p.Generic(evt) {
133+
shouldHandle = false
134+
break
135+
}
136+
}
137+
138+
if shouldHandle {
139+
handler.Generic(queue, evt)
140+
}
141+
}
142+
}()
143+
144+
cs.destLock.Lock()
145+
defer cs.destLock.Unlock()
146+
147+
cs.dest = append(cs.dest, dst)
148+
60149
return nil
61150
}
62151

152+
func (cs *ChannelSource) doStop() {
153+
cs.destLock.Lock()
154+
defer cs.destLock.Unlock()
155+
156+
for _, dst := range cs.dest {
157+
close(dst)
158+
}
159+
}
160+
161+
func (cs *ChannelSource) distribute(evt event.GenericEvent) {
162+
cs.destLock.Lock()
163+
defer cs.destLock.Unlock()
164+
165+
for _, dst := range cs.dest {
166+
// We cannot make it under goroutine here, or we'll meet the
167+
// race condition of writing message to closed channels.
168+
// To avoid blocking, the dest channels are expected to be of
169+
// proper buffer size. If we still see it blocked, then
170+
// the controller is thought to be in an abnormal state.
171+
dst <- evt
172+
}
173+
}
174+
63175
// KindSource is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
64176
type KindSource struct {
65177
// Type is the type of object to watch. e.g. &v1.Pod{}

pkg/controller/source/source_test.go

Lines changed: 190 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source_test
1818

1919
import (
2020
"fmt"
21+
"time"
2122

2223
"github.com/kubernetes-sigs/controller-runtime/pkg/cache/informertest"
2324
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
@@ -26,10 +27,11 @@ import (
2627
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
2728
. "github.com/onsi/ginkgo"
2829
. "github.com/onsi/gomega"
29-
"k8s.io/client-go/util/workqueue"
3030

3131
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/predicate"
3232
corev1 "k8s.io/api/core/v1"
33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/client-go/util/workqueue"
3335
)
3436

3537
var _ = Describe("Source", func() {
@@ -251,11 +253,194 @@ var _ = Describe("Source", func() {
251253
})
252254

253255
Describe("ChannelSource", func() {
254-
It("TODO(community): implement this", func(done Done) {
255-
instance := source.ChannelSource(make(chan event.GenericEvent))
256-
instance.Start(nil, nil)
256+
var stop chan struct{}
257+
var ch chan event.GenericEvent
257258

258-
close(done)
259+
BeforeEach(func() {
260+
stop = make(chan struct{})
261+
ch = make(chan event.GenericEvent)
262+
})
263+
264+
AfterEach(func() {
265+
close(stop)
266+
close(ch)
267+
})
268+
269+
Context("for a source", func() {
270+
It("should provide a GenericEvent", func(done Done) {
271+
ch := make(chan event.GenericEvent)
272+
c := make(chan struct{})
273+
p := &corev1.Pod{
274+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
275+
}
276+
evt := event.GenericEvent{
277+
Object: p,
278+
Meta: p,
279+
}
280+
281+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
282+
instance := &source.ChannelSource{Source: ch}
283+
inject.DoStop(stop, instance)
284+
err := instance.Start(eventhandler.Funcs{
285+
CreateFunc: func(workqueue.RateLimitingInterface, event.CreateEvent) {
286+
defer GinkgoRecover()
287+
Fail("Unexpected CreateEvent")
288+
},
289+
UpdateFunc: func(workqueue.RateLimitingInterface, event.UpdateEvent) {
290+
defer GinkgoRecover()
291+
Fail("Unexpected UpdateEvent")
292+
},
293+
DeleteFunc: func(workqueue.RateLimitingInterface, event.DeleteEvent) {
294+
defer GinkgoRecover()
295+
Fail("Unexpected DeleteEvent")
296+
},
297+
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
298+
defer GinkgoRecover()
299+
Expect(q2).To(Equal(q))
300+
Expect(evt.Meta).To(Equal(p))
301+
Expect(evt.Object).To(Equal(p))
302+
close(c)
303+
},
304+
}, q)
305+
Expect(err).NotTo(HaveOccurred())
306+
307+
ch <- evt
308+
<-c
309+
close(done)
310+
})
311+
It("should block if exceed buffer size", func(done Done) {
312+
ch := make(chan event.GenericEvent)
313+
evt := event.GenericEvent{}
314+
interval := 5 * time.Second
315+
316+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
317+
// Add a handler to get distribution blocked
318+
instance := &source.ChannelSource{Source: ch}
319+
instance.DestBufferSize = 1
320+
inject.DoStop(stop, instance)
321+
err := instance.Start(eventhandler.Funcs{
322+
CreateFunc: func(workqueue.RateLimitingInterface, event.CreateEvent) {
323+
defer GinkgoRecover()
324+
Fail("Unexpected CreateEvent")
325+
},
326+
UpdateFunc: func(workqueue.RateLimitingInterface, event.UpdateEvent) {
327+
defer GinkgoRecover()
328+
Fail("Unexpected UpdateEvent")
329+
},
330+
DeleteFunc: func(workqueue.RateLimitingInterface, event.DeleteEvent) {
331+
defer GinkgoRecover()
332+
Fail("Unexpected DeleteEvent")
333+
},
334+
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
335+
defer GinkgoRecover()
336+
time.Sleep(interval)
337+
},
338+
}, q)
339+
Expect(err).NotTo(HaveOccurred())
340+
341+
// get channel blocked
342+
ch <- evt
343+
ch <- evt
344+
ch <- evt
345+
346+
beforeEvent := time.Now()
347+
ch <- evt
348+
// validate event distribution get blocked.
349+
distributeInterval := time.Now().Sub(beforeEvent)
350+
Expect(distributeInterval >= interval).To(BeTrue())
351+
close(done)
352+
}, 15)
353+
It("should get error if no source specified", func(done Done) {
354+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
355+
instance := &source.ChannelSource{ /*no source specified*/ }
356+
inject.DoStop(stop, instance)
357+
err := instance.Start(eventhandler.Funcs{}, q)
358+
Expect(err).To(Equal(fmt.Errorf("must specify ChannelSource.Source")))
359+
close(done)
360+
})
361+
It("should get error if no stop channel injected", func(done Done) {
362+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
363+
instance := &source.ChannelSource{Source: ch}
364+
err := instance.Start(eventhandler.Funcs{}, q)
365+
Expect(err).To(Equal(fmt.Errorf("must call InjectStop on ChannelSource before calling Start")))
366+
close(done)
367+
})
368+
369+
})
370+
Context("for multi sources (handlers)", func() {
371+
It("should provide GenericEvents for all handlers", func(done Done) {
372+
ch := make(chan event.GenericEvent)
373+
p := &corev1.Pod{
374+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
375+
}
376+
evt := event.GenericEvent{
377+
Object: p,
378+
Meta: p,
379+
}
380+
381+
var resEvent1, resEvent2 event.GenericEvent
382+
c1 := make(chan struct{})
383+
c2 := make(chan struct{})
384+
385+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
386+
instance := &source.ChannelSource{Source: ch}
387+
inject.DoStop(stop, instance)
388+
err := instance.Start(eventhandler.Funcs{
389+
CreateFunc: func(workqueue.RateLimitingInterface, event.CreateEvent) {
390+
defer GinkgoRecover()
391+
Fail("Unexpected CreateEvent")
392+
},
393+
UpdateFunc: func(workqueue.RateLimitingInterface, event.UpdateEvent) {
394+
defer GinkgoRecover()
395+
Fail("Unexpected UpdateEvent")
396+
},
397+
DeleteFunc: func(workqueue.RateLimitingInterface, event.DeleteEvent) {
398+
defer GinkgoRecover()
399+
Fail("Unexpected DeleteEvent")
400+
},
401+
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
402+
defer GinkgoRecover()
403+
Expect(q2).To(Equal(q))
404+
Expect(evt.Meta).To(Equal(p))
405+
Expect(evt.Object).To(Equal(p))
406+
resEvent1 = evt
407+
close(c1)
408+
},
409+
}, q)
410+
Expect(err).NotTo(HaveOccurred())
411+
412+
err = instance.Start(eventhandler.Funcs{
413+
CreateFunc: func(workqueue.RateLimitingInterface, event.CreateEvent) {
414+
defer GinkgoRecover()
415+
Fail("Unexpected CreateEvent")
416+
},
417+
UpdateFunc: func(workqueue.RateLimitingInterface, event.UpdateEvent) {
418+
defer GinkgoRecover()
419+
Fail("Unexpected UpdateEvent")
420+
},
421+
DeleteFunc: func(workqueue.RateLimitingInterface, event.DeleteEvent) {
422+
defer GinkgoRecover()
423+
Fail("Unexpected DeleteEvent")
424+
},
425+
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
426+
defer GinkgoRecover()
427+
Expect(q2).To(Equal(q))
428+
Expect(evt.Meta).To(Equal(p))
429+
Expect(evt.Object).To(Equal(p))
430+
resEvent2 = evt
431+
close(c2)
432+
},
433+
}, q)
434+
Expect(err).NotTo(HaveOccurred())
435+
436+
ch <- evt
437+
<-c1
438+
<-c2
439+
440+
// Validate the two handlers received same event
441+
Expect(resEvent1).To(Equal(resEvent2))
442+
close(done)
443+
})
259444
})
260445
})
261446
})

0 commit comments

Comments
 (0)