@@ -84,8 +84,8 @@ type Controller struct {
84
84
85
85
// TODO(community): Consider initializing a logger with the Controller Name as the tag
86
86
87
- // watches maintains a list of sources, handlers, and predicates to start when the controller is started.
88
- watches []watchDescription
87
+ // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
88
+ startWatches []watchDescription
89
89
90
90
// Log is used to log messages to users during reconciliation, or for example when a watch is started.
91
91
Log logr.Logger
@@ -121,13 +121,16 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
121
121
}
122
122
}
123
123
124
- c .watches = append (c .watches , watchDescription {src : src , handler : evthdler , predicates : prct })
125
- if c .Started {
126
- c .Log .Info ("Starting EventSource" , "source" , src )
127
- return src .Start (evthdler , c .Queue , prct ... )
124
+ // Controller hasn't started yet, store the watches locally and return.
125
+ //
126
+ // These watches are going to be held on the controller struct until the manager or user calls Start(...).
127
+ if ! c .Started {
128
+ c .startWatches = append (c .startWatches , watchDescription {src : src , handler : evthdler , predicates : prct })
129
+ return nil
128
130
}
129
131
130
- return nil
132
+ c .Log .Info ("Starting EventSource" , "source" , src )
133
+ return src .Start (evthdler , c .Queue , prct ... )
131
134
}
132
135
133
136
// Start implements controller.Controller
@@ -148,7 +151,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
148
151
// NB(directxman12): launch the sources *before* trying to wait for the
149
152
// caches to sync so that they have a chance to register their intendeded
150
153
// caches.
151
- for _ , watch := range c .watches {
154
+ for _ , watch := range c .startWatches {
152
155
c .Log .Info ("Starting EventSource" , "source" , watch .src )
153
156
if err := watch .src .Start (watch .handler , c .Queue , watch .predicates ... ); err != nil {
154
157
return err
@@ -158,7 +161,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
158
161
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
159
162
c .Log .Info ("Starting Controller" )
160
163
161
- for _ , watch := range c .watches {
164
+ for _ , watch := range c .startWatches {
162
165
syncingSource , ok := watch .src .(source.SyncingSource )
163
166
if ! ok {
164
167
continue
@@ -172,6 +175,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
172
175
}
173
176
}
174
177
178
+ // All the watches have been started, we can reset the local slice.
179
+ //
180
+ // We should never hold watches more than necessary, each watch source can hold a backing cache,
181
+ // which won't be garbage collected if we hold a reference to it.
182
+ c .startWatches = nil
183
+
175
184
if c .JitterPeriod == 0 {
176
185
c .JitterPeriod = 1 * time .Second
177
186
}
0 commit comments