@@ -18,6 +18,7 @@ package source
18
18
19
19
import (
20
20
"fmt"
21
+ "sync"
21
22
22
23
"github.com/kubernetes-sigs/controller-runtime/pkg/event"
23
24
"github.com/kubernetes-sigs/controller-runtime/pkg/handler"
@@ -30,6 +31,11 @@ import (
30
31
"github.com/kubernetes-sigs/controller-runtime/pkg/predicate"
31
32
)
32
33
34
+ const (
35
+ // defaultBufferSize is the default number of event notifications that can be buffered.
36
+ defaultBufferSize = 1024
37
+ )
38
+
33
39
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
34
40
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
35
41
//
@@ -45,21 +51,130 @@ type Source interface {
45
51
Start (handler.EventHandler , workqueue.RateLimitingInterface , ... predicate.Predicate ) error
46
52
}
47
53
54
+ var _ Source = & Channel {}
55
+
48
56
// Channel is used to provide a source of events originating outside the cluster
49
57
// (eh.g. GitHub Webhook callback). Channel requires the user to wire the external
50
58
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
51
- type Channel chan event.GenericEvent
59
+ type Channel struct {
60
+ // once ensures the event distribution goroutine will be performed only once
61
+ once sync.Once
62
+
63
+ // Source is the source channel to fetch GenericEvents
64
+ Source <- chan event.GenericEvent
65
+
66
+ // stop is to end ongoing goroutine, and close the channels
67
+ stop <- chan struct {}
68
+
69
+ // dest is the destination channels of the added event handlers
70
+ dest []chan event.GenericEvent
71
+
72
+ // DestBufferSize is the specified buffer size of dest channels.
73
+ // Default to 1024 if not specified.
74
+ DestBufferSize int
75
+
76
+ // destLock is to ensure the destination channels are safely added/removed
77
+ destLock sync.Mutex
78
+ }
79
+
80
+ var _ inject.Stoppable = & Channel {}
81
+
82
+ // InjectStopChannel is internal should be called only by the Controller.
83
+ // It is used to inject the stop channel initialized by the ControllerManager.
84
+ func (cs * Channel ) InjectStopChannel (stop <- chan struct {}) error {
85
+ if cs .stop == nil {
86
+ cs .stop = stop
87
+ }
52
88
53
- var _ Source = Channel (make (chan event.GenericEvent ))
89
+ return nil
90
+ }
54
91
55
92
// Start implements Source and should only be called by the Controller.
56
- func (ks Channel ) Start (
93
+ func (cs * Channel ) Start (
57
94
handler handler.EventHandler ,
58
95
queue workqueue.RateLimitingInterface ,
59
96
prct ... predicate.Predicate ) error {
97
+ // Source should have been specified by the user.
98
+ if cs .Source == nil {
99
+ return fmt .Errorf ("must specify Channel.Source" )
100
+ }
101
+
102
+ // stop should have been injected before Start was called
103
+ if cs .stop == nil {
104
+ return fmt .Errorf ("must call InjectStop on Channel before calling Start" )
105
+ }
106
+
107
+ // use default value if DestBufferSize not specified
108
+ if cs .DestBufferSize == 0 {
109
+ cs .DestBufferSize = defaultBufferSize
110
+ }
111
+
112
+ cs .once .Do (func () {
113
+ // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
114
+ go cs .syncLoop ()
115
+ })
116
+
117
+ dst := make (chan event.GenericEvent , cs .DestBufferSize )
118
+ go func () {
119
+ for evt := range dst {
120
+ shouldHandle := true
121
+ for _ , p := range prct {
122
+ if ! p .Generic (evt ) {
123
+ shouldHandle = false
124
+ break
125
+ }
126
+ }
127
+
128
+ if shouldHandle {
129
+ handler .Generic (queue , evt )
130
+ }
131
+ }
132
+ }()
133
+
134
+ cs .destLock .Lock ()
135
+ defer cs .destLock .Unlock ()
136
+
137
+ cs .dest = append (cs .dest , dst )
138
+
60
139
return nil
61
140
}
62
141
142
+ func (cs * Channel ) doStop () {
143
+ cs .destLock .Lock ()
144
+ defer cs .destLock .Unlock ()
145
+
146
+ for _ , dst := range cs .dest {
147
+ close (dst )
148
+ }
149
+ }
150
+
151
+ func (cs * Channel ) distribute (evt event.GenericEvent ) {
152
+ cs .destLock .Lock ()
153
+ defer cs .destLock .Unlock ()
154
+
155
+ for _ , dst := range cs .dest {
156
+ // We cannot make it under goroutine here, or we'll meet the
157
+ // race condition of writing message to closed channels.
158
+ // To avoid blocking, the dest channels are expected to be of
159
+ // proper buffer size. If we still see it blocked, then
160
+ // the controller is thought to be in an abnormal state.
161
+ dst <- evt
162
+ }
163
+ }
164
+
165
+ func (cs * Channel ) syncLoop () {
166
+ for {
167
+ select {
168
+ case <- cs .stop :
169
+ // Close destination channels
170
+ cs .doStop ()
171
+ return
172
+ case evt := <- cs .Source :
173
+ cs .distribute (evt )
174
+ }
175
+ }
176
+ }
177
+
63
178
// Kind is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
64
179
type Kind struct {
65
180
// Type is the type of object to watch. e.g. &v1.Pod{}
0 commit comments