@@ -71,8 +71,8 @@ type Controller struct {
71
71
72
72
// TODO(community): Consider initializing a logger with the Controller Name as the tag
73
73
74
- // watches maintains a list of sources, handlers, and predicates to start when the controller is started.
75
- watches []watchDescription
74
+ // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
75
+ startWatches []watchDescription
76
76
77
77
// Log is used to log messages to users during reconciliation, or for example when a watch is started.
78
78
Log logr.Logger
@@ -108,13 +108,16 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
108
108
}
109
109
}
110
110
111
- c .watches = append (c .watches , watchDescription {src : src , handler : evthdler , predicates : prct })
112
- if c .Started {
113
- c .Log .Info ("Starting EventSource" , "source" , src )
114
- return src .Start (evthdler , c .Queue , prct ... )
111
+ // Controller hasn't started yet, store the watches locally and return.
112
+ //
113
+ // These watches are going to be held on the controller struct until the manager or user calls Start(...).
114
+ if ! c .Started {
115
+ c .startWatches = append (c .startWatches , watchDescription {src : src , handler : evthdler , predicates : prct })
116
+ return nil
115
117
}
116
118
117
- return nil
119
+ c .Log .Info ("Starting EventSource" , "source" , src )
120
+ return src .Start (evthdler , c .Queue , prct ... )
118
121
}
119
122
120
123
// Start implements controller.Controller
@@ -135,7 +138,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
135
138
// NB(directxman12): launch the sources *before* trying to wait for the
136
139
// caches to sync so that they have a chance to register their intendeded
137
140
// caches.
138
- for _ , watch := range c .watches {
141
+ for _ , watch := range c .startWatches {
139
142
c .Log .Info ("Starting EventSource" , "source" , watch .src )
140
143
if err := watch .src .Start (watch .handler , c .Queue , watch .predicates ... ); err != nil {
141
144
return err
@@ -145,7 +148,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
145
148
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
146
149
c .Log .Info ("Starting Controller" )
147
150
148
- for _ , watch := range c .watches {
151
+ for _ , watch := range c .startWatches {
149
152
syncingSource , ok := watch .src .(source.SyncingSource )
150
153
if ! ok {
151
154
continue
@@ -159,6 +162,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
159
162
}
160
163
}
161
164
165
+ // All the watches have been started, we can reset the local slice.
166
+ //
167
+ // We should never hold watches more than necessary, each watch source can hold a backing cache,
168
+ // which won't be garbage collected if we hold a reference to it.
169
+ c .startWatches = nil
170
+
162
171
if c .JitterPeriod == 0 {
163
172
c .JitterPeriod = 1 * time .Second
164
173
}
0 commit comments