Skip to content

Commit 6cab6c3

Browse files
committed
implement the ChannelSource
1 parent 63acb0c commit 6cab6c3

File tree

5 files changed

+202
-9
lines changed

5 files changed

+202
-9
lines changed

pkg/controller/manager.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
2424
"github.com/kubernetes-sigs/controller-runtime/pkg/client/config"
25+
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2526
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/reconcile"
2627
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/apiutil"
2728
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/informer"
@@ -74,6 +75,9 @@ type controllerManager struct {
7475
// informers are injected into Controllers (,and transitively EventHandlers, Sources and Predicates).
7576
informers informer.Informers
7677

78+
// outerChan is a channel to receive events originating outside the cluster (e.g. GitHub Webhook callback)
79+
outerChan chan event.GenericEvent
80+
7781
// TODO(directxman12): Provide an escape hatch to get individual indexers
7882
// client is the client injected into Controllers (and EventHandlers, Sources and Predicates).
7983
client client.Interface
@@ -139,6 +143,12 @@ func (cm *controllerManager) injectInto(i interface{}) error {
139143
if _, err := inject.DoInformers(cm.informers, i); err != nil {
140144
return err
141145
}
146+
if _, err := inject.DoSourceChannel(cm.outerChan, i); err != nil {
147+
return err
148+
}
149+
if _, err := inject.DoStop(cm.stop, i); err != nil {
150+
return err
151+
}
142152
return nil
143153
}
144154

@@ -196,11 +206,14 @@ type ManagerArgs struct {
196206
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
197207
// Defaults to the kubernetes/client-go scheme.Scheme
198208
Scheme *runtime.Scheme
209+
210+
// OuterChan is a channel to receive events originating outside the cluster (e.g. GitHub Webhook callback)
211+
OuterChan chan event.GenericEvent
199212
}
200213

201214
// NewManager returns a new fully initialized Manager.
202215
func NewManager(args ManagerArgs) (Manager, error) {
203-
cm := &controllerManager{config: args.Config, scheme: args.Scheme, errChan: make(chan error)}
216+
cm := &controllerManager{config: args.Config, scheme: args.Scheme, outerChan: args.OuterChan, errChan: make(chan error)}
204217

205218
// Initialize a rest.config if none was specified
206219
if cm.config == nil {

pkg/controller/source/example_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package source_test
1818

1919
import (
2020
"github.com/kubernetes-sigs/controller-runtime/pkg/controller"
21-
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2221
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
2322
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/source"
2423
"k8s.io/api/core/v1"
@@ -38,10 +37,8 @@ func ExampleKindSource() {
3837
// This example reads GenericEvents from a channel and enqueues a reconcile.Request containing the Name and Namespace
3938
// provided by the event.
4039
func ExampleChannelSource() {
41-
events := make(chan event.GenericEvent)
42-
4340
ctrl.Watch(
44-
source.ChannelSource(events),
41+
&source.ChannelSource{},
4542
&eventhandler.EnqueueHandler{},
4643
)
4744
}

pkg/controller/source/source.go

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2324
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
@@ -46,18 +47,115 @@ type Source interface {
4647
Start(eventhandler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
4748
}
4849

50+
var _ Source = &ChannelSource{}
51+
4952
// ChannelSource is used to provide a source of events originating outside the cluster
5053
// (eh.g. GitHub Webhook callback). ChannelSource requires the user to wire the external
5154
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
52-
type ChannelSource chan event.GenericEvent
55+
type ChannelSource struct {
56+
// once ensures the event distribution goroutine will be performed only once
57+
once sync.Once
58+
// source is the source channel to fetch GenericEvents
59+
source chan event.GenericEvent
60+
// stop is to end ongoing goroutine, and close the channels
61+
stop <-chan struct{}
62+
// dest is the destination channels of the added event handlers
63+
dest []chan event.GenericEvent
64+
// destLock is to ensure the destination channels are safely added/removed
65+
destLock sync.Mutex
66+
}
67+
68+
var _ inject.SourceChannel = &ChannelSource{}
69+
70+
// InjectSourceChannel is internal should be called only by the Controller.
71+
// It is used to inject the source event channel initialized by the ControllerManager.
72+
func (cs *ChannelSource) InjectSourceChannel(c chan event.GenericEvent) error {
73+
if cs.source == nil {
74+
cs.source = c
75+
}
76+
77+
return nil
78+
}
79+
80+
var _ inject.Stop = &ChannelSource{}
5381

54-
var _ Source = ChannelSource(make(chan event.GenericEvent))
82+
// InjectStop is internal should be called only by the Controller.
83+
// It is used to inject the stop channel initialized by the ControllerManager.
84+
func (cs *ChannelSource) InjectStop(stop <-chan struct{}) error {
85+
if cs.stop == nil {
86+
cs.stop = stop
87+
}
88+
89+
return nil
90+
}
5591

5692
// Start implements Source and should only be called by the Controller.
57-
func (ks ChannelSource) Start(
93+
func (cs *ChannelSource) Start(
5894
handler eventhandler.EventHandler,
5995
queue workqueue.RateLimitingInterface,
6096
prct ...predicate.Predicate) error {
97+
// source should have been injected before Start was called.
98+
if cs.source == nil {
99+
return fmt.Errorf("must call DoSourceChannel on ChannelSource before calling Start")
100+
}
101+
102+
// stop should have been injected before Start was called
103+
if cs.stop == nil {
104+
return fmt.Errorf("must call DoStop on ChannelSource before calling Start")
105+
}
106+
107+
cs.once.Do(func() {
108+
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
109+
go func() {
110+
for {
111+
select {
112+
case <-cs.stop:
113+
// Close source channel.
114+
// TODO: should we close the source channel here?
115+
close(cs.source)
116+
117+
// Close destination channels and remove them,
118+
// to avoid writing events to closed channels.
119+
cs.destLock.Lock()
120+
for _, dst := range cs.dest {
121+
close(dst)
122+
}
123+
cs.dest = []chan event.GenericEvent{}
124+
cs.destLock.Unlock()
125+
return
126+
case evt := <-cs.source:
127+
cs.destLock.Lock()
128+
for _, dst := range cs.dest {
129+
go func() { dst <- evt }()
130+
}
131+
cs.destLock.Unlock()
132+
}
133+
}
134+
}()
135+
})
136+
137+
dst := make(chan event.GenericEvent)
138+
go func() {
139+
for evt := range dst {
140+
shouldHandle := true
141+
for _, p := range prct {
142+
if !p.Generic(evt) {
143+
shouldHandle = false
144+
break
145+
}
146+
}
147+
148+
if shouldHandle {
149+
handler.Generic(queue, evt)
150+
}
151+
}
152+
}()
153+
154+
cs.destLock.Lock()
155+
defer cs.destLock.Unlock()
156+
157+
cs.dest = append(cs.dest, dst)
158+
61159
return nil
62160
}
63161

pkg/controller/source/source_test.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ import (
2626
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
2727
. "github.com/onsi/ginkgo"
2828
. "github.com/onsi/gomega"
29-
"k8s.io/client-go/util/workqueue"
3029

3130
corev1 "k8s.io/api/core/v1"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/client-go/util/workqueue"
3233
)
3334

3435
var _ = Describe("Source", func() {
@@ -203,4 +204,57 @@ var _ = Describe("Source", func() {
203204
})
204205
})
205206
})
207+
Describe("ChannelSource", func() {
208+
var c chan struct{}
209+
var ch chan event.GenericEvent
210+
211+
BeforeEach(func() {
212+
c = make(chan struct{})
213+
ch = make(chan event.GenericEvent)
214+
})
215+
216+
Context("for a GenericEvent", func() {
217+
It("should provide a GenericEvent", func(done Done) {
218+
p := &corev1.Pod{
219+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
220+
}
221+
evt := event.GenericEvent{
222+
Object: p,
223+
Meta: p.GetObjectMeta(),
224+
}
225+
226+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
227+
instance := &source.ChannelSource{}
228+
inject.DoStop(c, instance)
229+
inject.DoSourceChannel(ch, instance)
230+
err := instance.Start(eventhandler.Funcs{
231+
CreateFunc: func(workqueue.RateLimitingInterface, event.CreateEvent) {
232+
defer GinkgoRecover()
233+
Fail("Unexpected CreateEvent")
234+
},
235+
UpdateFunc: func(workqueue.RateLimitingInterface, event.UpdateEvent) {
236+
defer GinkgoRecover()
237+
Fail("Unexpected UpdateEvent")
238+
},
239+
DeleteFunc: func(workqueue.RateLimitingInterface, event.DeleteEvent) {
240+
defer GinkgoRecover()
241+
Fail("Unexpected DeleteEvent")
242+
},
243+
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
244+
defer GinkgoRecover()
245+
Expect(q2).To(Equal(q))
246+
Expect(evt.Meta).To(Equal(p.GetObjectMeta()))
247+
Expect(evt.Object).To(Equal(p))
248+
close(c)
249+
},
250+
}, q)
251+
Expect(err).NotTo(HaveOccurred())
252+
253+
ch <- evt
254+
<-c
255+
close(done)
256+
})
257+
258+
})
259+
})
206260
})

pkg/runtime/inject/inject.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package inject
1818

1919
import (
2020
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
21+
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/event"
2122
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/informer"
2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/client-go/rest"
@@ -82,3 +83,33 @@ func DoScheme(scheme *runtime.Scheme, i interface{}) (bool, error) {
8283
}
8384
return false, nil
8485
}
86+
87+
// SourceChannel is used by the ControllerManager to inject channel into Sources, EventHandlers, Predicates, and
88+
// Reconciles
89+
type SourceChannel interface {
90+
InjectSourceChannel(chan event.GenericEvent) error
91+
}
92+
93+
// DoSourceChannel will set source channel on i and return the result if it implements SourceChannel.
94+
// Returns false if i does not implement SourceChannel.
95+
func DoSourceChannel(ch chan event.GenericEvent, i interface{}) (bool, error) {
96+
if c, ok := i.(SourceChannel); ok {
97+
return true, c.InjectSourceChannel(ch)
98+
}
99+
return false, nil
100+
}
101+
102+
// Stop is used by the ControllerManager to inject stop channel into Sources, EventHandlers, Predicates, and
103+
// Reconciles
104+
type Stop interface {
105+
InjectStop(<-chan struct{}) error
106+
}
107+
108+
// DoStop will set stop channel on i and return the result if it implements Stop.
109+
// Returns false if i does not implement Stop.
110+
func DoStop(stop <-chan struct{}, i interface{}) (bool, error) {
111+
if s, ok := i.(Stop); ok {
112+
return true, s.InjectStop(stop)
113+
}
114+
return false, nil
115+
}

0 commit comments

Comments
 (0)