File tree Expand file tree Collapse file tree 1 file changed +19
-2
lines changed Expand file tree Collapse file tree 1 file changed +19
-2
lines changed Original file line number Diff line number Diff line change @@ -115,10 +115,23 @@ func (cs *ChannelSource) Start(
115
115
select {
116
116
case <- cs .stop :
117
117
// Close destination channels
118
- cs .doStop ()
118
+ cs .destLock .Lock ()
119
+ for _ , dst := range cs .dest {
120
+ close (dst )
121
+ }
122
+ cs .destLock .Unlock ()
119
123
return
120
124
case evt := <- cs .Source :
121
- cs .distribute (evt )
125
+ cs .destLock .Lock ()
126
+ for _ , dst := range cs .dest {
127
+ // We cannot make it under goroutine here, or we'll meet the
128
+ // race condition of writing message to closed channels.
129
+ // To avoid blocking, the dest channels are expected to be of
130
+ // proper buffer size. If we still see it blocked, then
131
+ // the controller is thought to be in an abnormal state.
132
+ dst <- evt
133
+ }
134
+ cs .destLock .Unlock ()
122
135
}
123
136
}
124
137
}()
@@ -149,6 +162,7 @@ func (cs *ChannelSource) Start(
149
162
return nil
150
163
}
151
164
165
+ /*
152
166
func (cs *ChannelSource) doStop() {
153
167
cs.destLock.Lock()
154
168
defer cs.destLock.Unlock()
@@ -157,7 +171,9 @@ func (cs *ChannelSource) doStop() {
157
171
close(dst)
158
172
}
159
173
}
174
+ */
160
175
176
+ /*
161
177
func (cs *ChannelSource) distribute(evt event.GenericEvent) {
162
178
cs.destLock.Lock()
163
179
defer cs.destLock.Unlock()
@@ -171,6 +187,7 @@ func (cs *ChannelSource) distribute(evt event.GenericEvent) {
171
187
dst <- evt
172
188
}
173
189
}
190
+ */
174
191
175
192
// KindSource is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
176
193
type KindSource struct {
You can’t perform that action at this time.
0 commit comments