Skip to content

Commit 039baf6

Browse files
committed
implement the ChannelSource
1 parent 63acb0c commit 039baf6

File tree

5 files changed

+205
-9
lines changed

5 files changed

+205
-9
lines changed

pkg/controller/manager.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
2424
"github.com/kubernetes-sigs/controller-runtime/pkg/client/config"
25+
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2526
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/reconcile"
2627
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/apiutil"
2728
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/informer"
@@ -74,6 +75,9 @@ type controllerManager struct {
7475
// informers are injected into Controllers (,and transitively EventHandlers, Sources and Predicates).
7576
informers informer.Informers
7677

78+
// outerChan is a channel to receive events originating outside the cluster (e.g. GitHub Webhook callback)
79+
outerChan chan event.GenericEvent
80+
7781
// TODO(directxman12): Provide an escape hatch to get individual indexers
7882
// client is the client injected into Controllers (and EventHandlers, Sources and Predicates).
7983
client client.Interface
@@ -139,6 +143,12 @@ func (cm *controllerManager) injectInto(i interface{}) error {
139143
if _, err := inject.DoInformers(cm.informers, i); err != nil {
140144
return err
141145
}
146+
if _, err := inject.DoSourceChannel(cm.outerChan, i); err != nil {
147+
return err
148+
}
149+
if _, err := inject.DoStop(cm.stop, i); err != nil {
150+
return err
151+
}
142152
return nil
143153
}
144154

@@ -196,11 +206,14 @@ type ManagerArgs struct {
196206
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
197207
// Defaults to the kubernetes/client-go scheme.Scheme
198208
Scheme *runtime.Scheme
209+
210+
// OuterChan is a channel to receive events originating outside the cluster (e.g. GitHub Webhook callback)
211+
OuterChan chan event.GenericEvent
199212
}
200213

201214
// NewManager returns a new fully initialized Manager.
202215
func NewManager(args ManagerArgs) (Manager, error) {
203-
cm := &controllerManager{config: args.Config, scheme: args.Scheme, errChan: make(chan error)}
216+
cm := &controllerManager{config: args.Config, scheme: args.Scheme, outerChan: args.OuterChan, errChan: make(chan error)}
204217

205218
// Initialize a rest.config if none was specified
206219
if cm.config == nil {

pkg/controller/source/example_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package source_test
1818

1919
import (
2020
"github.com/kubernetes-sigs/controller-runtime/pkg/controller"
21-
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2221
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
2322
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/source"
2423
"k8s.io/api/core/v1"
@@ -38,10 +37,8 @@ func ExampleKindSource() {
3837
// This example reads GenericEvents from a channel and enqueues a reconcile.Request containing the Name and Namespace
3938
// provided by the event.
4039
func ExampleChannelSource() {
41-
events := make(chan event.GenericEvent)
42-
4340
ctrl.Watch(
44-
source.ChannelSource(events),
41+
&source.ChannelSource{},
4542
&eventhandler.EnqueueHandler{},
4643
)
4744
}

pkg/controller/source/source.go

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2324
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
@@ -31,6 +32,11 @@ import (
3132
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
3233
)
3334

35+
const (
36+
// initialBufferSize is the initial number of event notifications that can be buffered.
37+
initialBufferSize = 1024
38+
)
39+
3440
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
3541
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
3642
//
@@ -46,18 +52,113 @@ type Source interface {
4652
Start(eventhandler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
4753
}
4854

55+
var _ Source = &ChannelSource{}
56+
4957
// ChannelSource is used to provide a source of events originating outside the cluster
5058
// (eh.g. GitHub Webhook callback). ChannelSource requires the user to wire the external
5159
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
52-
type ChannelSource chan event.GenericEvent
60+
type ChannelSource struct {
61+
// once ensures the event distribution goroutine will be performed only once
62+
once sync.Once
63+
// source is the source channel to fetch GenericEvents
64+
source chan event.GenericEvent
65+
// stop is to end ongoing goroutine, and close the channels
66+
stop <-chan struct{}
67+
// dest is the destination channels of the added event handlers
68+
dest []chan event.GenericEvent
69+
// destLock is to ensure the destination channels are safely added/removed
70+
destLock sync.Mutex
71+
}
72+
73+
var _ inject.SourceChannel = &ChannelSource{}
74+
75+
// InjectSourceChannel is internal should be called only by the Controller.
76+
// It is used to inject the source event channel initialized by the ControllerManager.
77+
func (cs *ChannelSource) InjectSourceChannel(c chan event.GenericEvent) error {
78+
if cs.source == nil {
79+
cs.source = c
80+
}
81+
82+
return nil
83+
}
5384

54-
var _ Source = ChannelSource(make(chan event.GenericEvent))
85+
var _ inject.Stop = &ChannelSource{}
86+
87+
// InjectStop is internal should be called only by the Controller.
88+
// It is used to inject the stop channel initialized by the ControllerManager.
89+
func (cs *ChannelSource) InjectStop(stop <-chan struct{}) error {
90+
if cs.stop == nil {
91+
cs.stop = stop
92+
}
93+
94+
return nil
95+
}
5596

5697
// Start implements Source and should only be called by the Controller.
57-
func (ks ChannelSource) Start(
98+
func (cs *ChannelSource) Start(
5899
handler eventhandler.EventHandler,
59100
queue workqueue.RateLimitingInterface,
60101
prct ...predicate.Predicate) error {
102+
// source should have been injected before Start was called.
103+
if cs.source == nil {
104+
return fmt.Errorf("must call DoSourceChannel on ChannelSource before calling Start")
105+
}
106+
107+
// stop should have been injected before Start was called
108+
if cs.stop == nil {
109+
return fmt.Errorf("must call DoStop on ChannelSource before calling Start")
110+
}
111+
112+
cs.once.Do(func() {
113+
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
114+
go func() {
115+
for {
116+
select {
117+
case <-cs.stop:
118+
// Close source channel.
119+
// TODO: should we close the source channel here?
120+
close(cs.source)
121+
122+
// Close destination channels
123+
cs.destLock.Lock()
124+
for _, dst := range cs.dest {
125+
close(dst)
126+
}
127+
cs.destLock.Unlock()
128+
return
129+
case evt := <-cs.source:
130+
cs.destLock.Lock()
131+
for _, dst := range cs.dest {
132+
dst <- evt
133+
}
134+
cs.destLock.Unlock()
135+
}
136+
}
137+
}()
138+
})
139+
140+
dst := make(chan event.GenericEvent, initialBufferSize)
141+
go func() {
142+
for evt := range dst {
143+
shouldHandle := true
144+
for _, p := range prct {
145+
if !p.Generic(evt) {
146+
shouldHandle = false
147+
break
148+
}
149+
}
150+
151+
if shouldHandle {
152+
handler.Generic(queue, evt)
153+
}
154+
}
155+
}()
156+
157+
cs.destLock.Lock()
158+
defer cs.destLock.Unlock()
159+
160+
cs.dest = append(cs.dest, dst)
161+
61162
return nil
62163
}
63164

pkg/controller/source/source_test.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ import (
2626
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
2727
. "github.com/onsi/ginkgo"
2828
. "github.com/onsi/gomega"
29-
"k8s.io/client-go/util/workqueue"
3029

3130
corev1 "k8s.io/api/core/v1"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/client-go/util/workqueue"
3233
)
3334

3435
var _ = Describe("Source", func() {
@@ -203,4 +204,57 @@ var _ = Describe("Source", func() {
203204
})
204205
})
205206
})
207+
Describe("ChannelSource", func() {
208+
var c chan struct{}
209+
var ch chan event.GenericEvent
210+
211+
BeforeEach(func() {
212+
c = make(chan struct{})
213+
ch = make(chan event.GenericEvent)
214+
})
215+
216+
Context("for a GenericEvent", func() {
217+
It("should provide a GenericEvent", func(done Done) {
218+
p := &corev1.Pod{
219+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
220+
}
221+
evt := event.GenericEvent{
222+
Object: p,
223+
Meta: p.GetObjectMeta(),
224+
}
225+
226+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
227+
instance := &source.ChannelSource{}
228+
inject.DoStop(c, instance)
229+
inject.DoSourceChannel(ch, instance)
230+
err := instance.Start(eventhandler.Funcs{
231+
CreateFunc: func(workqueue.RateLimitingInterface, event.CreateEvent) {
232+
defer GinkgoRecover()
233+
Fail("Unexpected CreateEvent")
234+
},
235+
UpdateFunc: func(workqueue.RateLimitingInterface, event.UpdateEvent) {
236+
defer GinkgoRecover()
237+
Fail("Unexpected UpdateEvent")
238+
},
239+
DeleteFunc: func(workqueue.RateLimitingInterface, event.DeleteEvent) {
240+
defer GinkgoRecover()
241+
Fail("Unexpected DeleteEvent")
242+
},
243+
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
244+
defer GinkgoRecover()
245+
Expect(q2).To(Equal(q))
246+
Expect(evt.Meta).To(Equal(p.GetObjectMeta()))
247+
Expect(evt.Object).To(Equal(p))
248+
close(c)
249+
},
250+
}, q)
251+
Expect(err).NotTo(HaveOccurred())
252+
253+
ch <- evt
254+
<-c
255+
close(done)
256+
})
257+
258+
})
259+
})
206260
})

pkg/runtime/inject/inject.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package inject
1818

1919
import (
2020
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
21+
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2122
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/informer"
2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/client-go/rest"
@@ -82,3 +83,33 @@ func DoScheme(scheme *runtime.Scheme, i interface{}) (bool, error) {
8283
}
8384
return false, nil
8485
}
86+
87+
// SourceChannel is used by the ControllerManager to inject channel into Sources, EventHandlers, Predicates, and
88+
// Reconciles
89+
type SourceChannel interface {
90+
InjectSourceChannel(chan event.GenericEvent) error
91+
}
92+
93+
// DoSourceChannel will set source channel on i and return the result if it implements SourceChannel.
94+
// Returns false if i does not implement SourceChannel.
95+
func DoSourceChannel(ch chan event.GenericEvent, i interface{}) (bool, error) {
96+
if c, ok := i.(SourceChannel); ok {
97+
return true, c.InjectSourceChannel(ch)
98+
}
99+
return false, nil
100+
}
101+
102+
// Stop is used by the ControllerManager to inject stop channel into Sources, EventHandlers, Predicates, and
103+
// Reconciles
104+
type Stop interface {
105+
InjectStop(<-chan struct{}) error
106+
}
107+
108+
// DoStop will set stop channel on i and return the result if it implements Stop.
109+
// Returns false if i does not implement Stop.
110+
func DoStop(stop <-chan struct{}, i interface{}) (bool, error) {
111+
if s, ok := i.(Stop); ok {
112+
return true, s.InjectStop(stop)
113+
}
114+
return false, nil
115+
}

0 commit comments

Comments
 (0)