@@ -74,8 +74,8 @@ type Controller struct {
74
74
75
75
// TODO(community): Consider initializing a logger with the Controller Name as the tag
76
76
77
- // watches maintains a list of sources, handlers, and predicates to start when the controller is started.
78
- watches []watchDescription
77
+ // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
78
+ startWatches []watchDescription
79
79
80
80
// Log is used to log messages to users during reconciliation, or for example when a watch is started.
81
81
Log logr.Logger
@@ -113,13 +113,16 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
113
113
}
114
114
}
115
115
116
- c .watches = append (c .watches , watchDescription {src : src , handler : evthdler , predicates : prct })
117
- if c .Started {
118
- c .Log .Info ("Starting EventSource" , "source" , src )
119
- return src .Start (evthdler , c .Queue , prct ... )
116
+ // Controller hasn't started yet, store the watches locally and return.
117
+ //
118
+ // These watches are going to be held on the controller struct until the manager or user calls Start(...).
119
+ if ! c .Started {
120
+ c .startWatches = append (c .startWatches , watchDescription {src : src , handler : evthdler , predicates : prct })
121
+ return nil
120
122
}
121
123
122
- return nil
124
+ c .Log .Info ("Starting EventSource" , "source" , src )
125
+ return src .Start (evthdler , c .Queue , prct ... )
123
126
}
124
127
125
128
// Start implements controller.Controller
@@ -143,7 +146,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
143
146
// NB(directxman12): launch the sources *before* trying to wait for the
144
147
// caches to sync so that they have a chance to register their intendeded
145
148
// caches.
146
- for _ , watch := range c .watches {
149
+ for _ , watch := range c .startWatches {
147
150
c .Log .Info ("Starting EventSource" , "source" , watch .src )
148
151
if err := watch .src .Start (watch .handler , c .Queue , watch .predicates ... ); err != nil {
149
152
return err
@@ -153,7 +156,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
153
156
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
154
157
c .Log .Info ("Starting Controller" )
155
158
156
- for _ , watch := range c .watches {
159
+ for _ , watch := range c .startWatches {
157
160
syncingSource , ok := watch .src .(source.SyncingSource )
158
161
if ! ok {
159
162
continue
@@ -167,6 +170,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
167
170
}
168
171
}
169
172
173
+ // All the watches have been started, we can reset the local slice.
174
+ //
175
+ // We should never hold watches more than necessary, each watch source can hold a backing cache,
176
+ // which won't be garbage collected if we hold a reference to it.
177
+ c .startWatches = nil
178
+
170
179
if c .JitterPeriod == 0 {
171
180
c .JitterPeriod = 1 * time .Second
172
181
}
0 commit comments