Skip to content

Commit b5597f5

Browse files
authored
Merge pull request #6 from lichuqiang/master
Implement the ChannelSource
2 parents b85f2b9 + b0b0957 commit b5597f5

File tree

5 files changed

+348
-9
lines changed

5 files changed

+348
-9
lines changed

pkg/manager/internal.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
9696
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
9797
return err
9898
}
99+
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
100+
return err
101+
}
99102
return nil
100103
}
101104

pkg/runtime/inject/inject.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,21 @@ func SchemeInto(scheme *runtime.Scheme, i interface{}) (bool, error) {
8383
return false, nil
8484
}
8585

86+
// Stoppable is used by the ControllerManager to inject stop channel into Sources,
87+
// EventHandlers, Predicates, and Reconciles.
88+
type Stoppable interface {
89+
InjectStopChannel(<-chan struct{}) error
90+
}
91+
92+
// StopChannelInto will set stop channel on i and return the result if it implements Stoppable.
93+
// Returns false if i does not implement Stoppable.
94+
func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) {
95+
if s, ok := i.(Stoppable); ok {
96+
return true, s.InjectStopChannel(stop)
97+
}
98+
return false, nil
99+
}
100+
86101
// Func injects dependencies into i.
87102
type Func func(i interface{}) error
88103

pkg/source/example_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,9 @@ func ExampleKind() {
3636
// provided by the event.
3737
func ExampleChannel() {
3838
events := make(chan event.GenericEvent)
39-
ctrl.Watch(source.Channel(events), &handler.Enqueue{})
39+
40+
ctrl.Watch(
41+
&source.Channel{Source: events},
42+
&handler.Enqueue{},
43+
)
4044
}

pkg/source/source.go

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
"github.com/kubernetes-sigs/controller-runtime/pkg/event"
2324
"github.com/kubernetes-sigs/controller-runtime/pkg/handler"
@@ -30,6 +31,11 @@ import (
3031
"github.com/kubernetes-sigs/controller-runtime/pkg/predicate"
3132
)
3233

34+
const (
35+
// defaultBufferSize is the default number of event notifications that can be buffered.
36+
defaultBufferSize = 1024
37+
)
38+
3339
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
3440
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
3541
//
@@ -45,21 +51,130 @@ type Source interface {
4551
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
4652
}
4753

54+
var _ Source = &Channel{}
55+
4856
// Channel is used to provide a source of events originating outside the cluster
4957
// (eh.g. GitHub Webhook callback). Channel requires the user to wire the external
5058
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
51-
type Channel chan event.GenericEvent
59+
type Channel struct {
60+
// once ensures the event distribution goroutine will be performed only once
61+
once sync.Once
62+
63+
// Source is the source channel to fetch GenericEvents
64+
Source <-chan event.GenericEvent
65+
66+
// stop is to end ongoing goroutine, and close the channels
67+
stop <-chan struct{}
68+
69+
// dest is the destination channels of the added event handlers
70+
dest []chan event.GenericEvent
71+
72+
// DestBufferSize is the specified buffer size of dest channels.
73+
// Default to 1024 if not specified.
74+
DestBufferSize int
75+
76+
// destLock is to ensure the destination channels are safely added/removed
77+
destLock sync.Mutex
78+
}
79+
80+
var _ inject.Stoppable = &Channel{}
81+
82+
// InjectStopChannel is internal should be called only by the Controller.
83+
// It is used to inject the stop channel initialized by the ControllerManager.
84+
func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
85+
if cs.stop == nil {
86+
cs.stop = stop
87+
}
5288

53-
var _ Source = Channel(make(chan event.GenericEvent))
89+
return nil
90+
}
5491

5592
// Start implements Source and should only be called by the Controller.
56-
func (ks Channel) Start(
93+
func (cs *Channel) Start(
5794
handler handler.EventHandler,
5895
queue workqueue.RateLimitingInterface,
5996
prct ...predicate.Predicate) error {
97+
// Source should have been specified by the user.
98+
if cs.Source == nil {
99+
return fmt.Errorf("must specify Channel.Source")
100+
}
101+
102+
// stop should have been injected before Start was called
103+
if cs.stop == nil {
104+
return fmt.Errorf("must call InjectStop on Channel before calling Start")
105+
}
106+
107+
// use default value if DestBufferSize not specified
108+
if cs.DestBufferSize == 0 {
109+
cs.DestBufferSize = defaultBufferSize
110+
}
111+
112+
cs.once.Do(func() {
113+
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
114+
go cs.syncLoop()
115+
})
116+
117+
dst := make(chan event.GenericEvent, cs.DestBufferSize)
118+
go func() {
119+
for evt := range dst {
120+
shouldHandle := true
121+
for _, p := range prct {
122+
if !p.Generic(evt) {
123+
shouldHandle = false
124+
break
125+
}
126+
}
127+
128+
if shouldHandle {
129+
handler.Generic(queue, evt)
130+
}
131+
}
132+
}()
133+
134+
cs.destLock.Lock()
135+
defer cs.destLock.Unlock()
136+
137+
cs.dest = append(cs.dest, dst)
138+
60139
return nil
61140
}
62141

142+
func (cs *Channel) doStop() {
143+
cs.destLock.Lock()
144+
defer cs.destLock.Unlock()
145+
146+
for _, dst := range cs.dest {
147+
close(dst)
148+
}
149+
}
150+
151+
func (cs *Channel) distribute(evt event.GenericEvent) {
152+
cs.destLock.Lock()
153+
defer cs.destLock.Unlock()
154+
155+
for _, dst := range cs.dest {
156+
// We cannot make it under goroutine here, or we'll meet the
157+
// race condition of writing message to closed channels.
158+
// To avoid blocking, the dest channels are expected to be of
159+
// proper buffer size. If we still see it blocked, then
160+
// the controller is thought to be in an abnormal state.
161+
dst <- evt
162+
}
163+
}
164+
165+
func (cs *Channel) syncLoop() {
166+
for {
167+
select {
168+
case <-cs.stop:
169+
// Close destination channels
170+
cs.doStop()
171+
return
172+
case evt := <-cs.Source:
173+
cs.distribute(evt)
174+
}
175+
}
176+
}
177+
63178
// Kind is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
64179
type Kind struct {
65180
// Type is the type of object to watch. e.g. &v1.Pod{}

0 commit comments

Comments
 (0)