@@ -30,23 +30,32 @@ import (
30
30
"k8s.io/apimachinery/pkg/watch"
31
31
"k8s.io/client-go/rest"
32
32
"k8s.io/client-go/tools/cache"
33
+
33
34
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
34
35
)
35
36
36
- // NewInformersMap returns a new InformersMap
37
- func NewInformersMap (config * rest.Config ,
37
+ // clientCreatorFunc knows how to create a client and the corresponding list object that it should
38
+ // deserialize into for any given group-version-kind.
39
+ type clientCreatorFunc func (gvk schema.GroupVersionKind ,
40
+ codecs serializer.CodecFactory ,
41
+ scheme * runtime.Scheme ,
42
+ baseConfig * rest.Config ) (client rest.Interface , listObj runtime.Object , err error )
43
+
44
+ // newSpecificInformersMap returns a new specificInformersMap (like
45
+ // the generical InformersMap, except that it doesn't implement WaitForCacheSync).
46
+ func newSpecificInformersMap (config * rest.Config ,
38
47
scheme * runtime.Scheme ,
39
48
mapper meta.RESTMapper ,
40
- resync time.Duration ) * InformersMap {
41
- ip := & InformersMap {
42
- config : config ,
43
- Scheme : scheme ,
44
- mapper : mapper ,
45
- informersByGVK : make (map [schema.GroupVersionKind ]* MapEntry ),
46
- unstructuredInformerByGVK : make ( map [schema. GroupVersionKind ] * MapEntry ),
47
- codecs : serializer . NewCodecFactory (scheme ),
48
- paramCodec : runtime . NewParameterCodec ( scheme ) ,
49
- resync : resync ,
49
+ resync time.Duration , createClient clientCreatorFunc ) * specificInformersMap {
50
+ ip := & specificInformersMap {
51
+ config : config ,
52
+ Scheme : scheme ,
53
+ mapper : mapper ,
54
+ informersByGVK : make (map [schema.GroupVersionKind ]* MapEntry ),
55
+ codecs : serializer . NewCodecFactory ( scheme ),
56
+ paramCodec : runtime . NewParameterCodec (scheme ),
57
+ resync : resync ,
58
+ createClient : createClient ,
50
59
}
51
60
return ip
52
61
}
@@ -60,9 +69,9 @@ type MapEntry struct {
60
69
Reader CacheReader
61
70
}
62
71
63
- // InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
64
- //It uses a standard parameter codec constructed based on the given generated Scheme.
65
- type InformersMap struct {
72
+ // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
73
+ // It uses a standard parameter codec constructed based on the given generated Scheme.
74
+ type specificInformersMap struct {
66
75
// Scheme maps runtime.Objects to GroupVersionKinds
67
76
Scheme * runtime.Scheme
68
77
@@ -75,10 +84,6 @@ type InformersMap struct {
75
84
// informersByGVK is the cache of informers keyed by groupVersionKind
76
85
informersByGVK map [schema.GroupVersionKind ]* MapEntry
77
86
78
- // unstructuredInformerByGVK is a cache of informers for unstructured types
79
- // keyed by groupVersionKind
80
- unstructuredInformerByGVK map [schema.GroupVersionKind ]* MapEntry
81
-
82
87
// codecs is used to create a new REST client
83
88
codecs serializer.CodecFactory
84
89
@@ -93,20 +98,22 @@ type InformersMap struct {
93
98
94
99
// mu guards access to the map
95
100
mu sync.RWMutex
96
- // mu guards access to the unstructured map
97
- unstructuredMu sync.RWMutex
98
101
99
102
// start is true if the informers have been started
100
103
started bool
104
+
105
+ // createClient knows how to create a client and a list object,
106
+ // and allows for abstracting over the particulars of structured vs
107
+ // unstructured objects.
108
+ createClient clientCreatorFunc
101
109
}
102
110
103
111
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
104
- func (ip * InformersMap ) Start (stop <- chan struct {}) error {
112
+ // It doesn't return start because it can't return an error, and it's not a runnable directly.
113
+ func (ip * specificInformersMap ) Start (stop <- chan struct {}) {
105
114
func () {
106
115
ip .mu .Lock ()
107
- ip .unstructuredMu .Lock ()
108
116
defer ip .mu .Unlock ()
109
- defer ip .unstructuredMu .Unlock ()
110
117
111
118
// Set the stop channel so it can be passed to informers that are added later
112
119
ip .stop = stop
@@ -116,52 +123,31 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error {
116
123
go informer .Informer .Run (stop )
117
124
}
118
125
119
- // Start each unstructured informer
120
- for _ , informer := range ip .unstructuredInformerByGVK {
121
- go informer .Informer .Run (stop )
122
- }
123
-
124
126
// Set started to true so we immediately start any informers added later.
125
127
ip .started = true
126
128
}()
127
129
<- stop
128
- return nil
129
130
}
130
131
131
- // WaitForCacheSync waits until all the caches have been synced
132
- func (ip * InformersMap ) WaitForCacheSync ( stop <- chan struct {}) bool {
133
- syncedFuncs := make ([]cache.InformerSynced , 0 , len (ip .informersByGVK )+ len ( ip . unstructuredInformerByGVK ) )
132
+ // HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
133
+ func (ip * specificInformersMap ) HasSyncedFuncs () []cache. InformerSynced {
134
+ syncedFuncs := make ([]cache.InformerSynced , 0 , len (ip .informersByGVK ))
134
135
for _ , informer := range ip .informersByGVK {
135
136
syncedFuncs = append (syncedFuncs , informer .Informer .HasSynced )
136
137
}
137
- for _ , informer := range ip .unstructuredInformerByGVK {
138
- syncedFuncs = append (syncedFuncs , informer .Informer .HasSynced )
139
- }
140
- return cache .WaitForCacheSync (stop , syncedFuncs ... )
141
- }
142
-
143
- func (ip * InformersMap ) getMapEntry (gvk schema.GroupVersionKind , isUnstructured bool ) (* MapEntry , bool ) {
144
- if isUnstructured {
145
- ip .unstructuredMu .RLock ()
146
- defer ip .unstructuredMu .RUnlock ()
147
- i , ok := ip .unstructuredInformerByGVK [gvk ]
148
- return i , ok
149
- }
150
- ip .mu .RLock ()
151
- defer ip .mu .RUnlock ()
152
- i , ok := ip .informersByGVK [gvk ]
153
- return i , ok
154
-
138
+ return syncedFuncs
155
139
}
156
140
157
- // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
141
+ // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
158
142
// the Informer from the map.
159
- func (ip * InformersMap ) Get (gvk schema.GroupVersionKind , obj runtime.Object ) (* MapEntry , error ) {
160
- _ , isUnstructured := obj .(* unstructured.Unstructured )
161
- _ , isUnstructuredList := obj .(* unstructured.UnstructuredList )
162
- isUnstructured = isUnstructured || isUnstructuredList
143
+ func (ip * specificInformersMap ) Get (gvk schema.GroupVersionKind , obj runtime.Object ) (* MapEntry , error ) {
163
144
// Return the informer if it is found
164
- i , ok := ip .getMapEntry (gvk , isUnstructured )
145
+ i , ok := func () (* MapEntry , bool ) {
146
+ ip .mu .RLock ()
147
+ defer ip .mu .RUnlock ()
148
+ i , ok := ip .informersByGVK [gvk ]
149
+ return i , ok
150
+ }()
165
151
if ok {
166
152
return i , nil
167
153
}
@@ -170,27 +156,21 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
170
156
// need to be locked
171
157
var sync bool
172
158
i , err := func () (* MapEntry , error ) {
173
- var ok bool
174
- var i * MapEntry
175
- // Check the caches to see if we already have an Informer. If we do, return the Informer.
159
+ ip .mu .Lock ()
160
+ defer ip .mu .Unlock ()
161
+
162
+ // Check the cache to see if we already have an Informer. If we do, return the Informer.
176
163
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
177
164
// so neither returned early, but the first one created it.
178
- if isUnstructured {
179
- ip .unstructuredMu .Lock ()
180
- defer ip .unstructuredMu .Unlock ()
181
- i , ok = ip .unstructuredInformerByGVK [gvk ]
182
- } else {
183
- ip .mu .Lock ()
184
- defer ip .mu .Unlock ()
185
- i , ok = ip .informersByGVK [gvk ]
186
- }
165
+ var ok bool
166
+ i , ok := ip .informersByGVK [gvk ]
187
167
if ok {
188
168
return i , nil
189
169
}
190
170
191
171
// Create a NewSharedIndexInformer and add it to the map.
192
172
var lw * cache.ListWatch
193
- lw , err := ip .newListWatch (gvk , isUnstructured )
173
+ lw , err := ip .newListWatch (gvk )
194
174
if err != nil {
195
175
return nil , err
196
176
}
@@ -201,7 +181,7 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
201
181
Informer : ni ,
202
182
Reader : CacheReader {indexer : ni .GetIndexer (), groupVersionKind : gvk },
203
183
}
204
- ip .setMap ( i , gvk , isUnstructured )
184
+ ip .informersByGVK [ gvk ] = i
205
185
206
186
// Start the Informer if need by
207
187
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
@@ -226,18 +206,8 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
226
206
return i , err
227
207
}
228
208
229
- // setMap - helper function to decide which map to add to.
230
- func (ip * InformersMap ) setMap (i * MapEntry , gvk schema.GroupVersionKind , isUnstructured bool ) {
231
- if isUnstructured {
232
- ip .unstructuredInformerByGVK [gvk ] = i
233
- } else {
234
-
235
- ip .informersByGVK [gvk ] = i
236
- }
237
- }
238
-
239
209
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
240
- func (ip * InformersMap ) newListWatch (gvk schema.GroupVersionKind , isUnstructured bool ) (* cache.ListWatch , error ) {
210
+ func (ip * specificInformersMap ) newListWatch (gvk schema.GroupVersionKind ) (* cache.ListWatch , error ) {
241
211
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
242
212
// groupVersionKind to the Resource API we will use.
243
213
mapping , err := ip .mapper .RESTMapping (gvk .GroupKind (), gvk .Version )
@@ -246,25 +216,10 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured
246
216
}
247
217
248
218
// Construct a RESTClient for the groupVersionKind that we will use to
249
- // talk to the apiserver.
250
- var client rest.Interface
251
- var listObj runtime.Object
252
- if isUnstructured {
253
- listObj = & unstructured.UnstructuredList {}
254
- client , err = apiutil .RESTUnstructuredClientForGVK (gvk , ip .config )
255
- if err != nil {
256
- return nil , err
257
- }
258
- } else {
259
- client , err = apiutil .RESTClientForGVK (gvk , ip .config , ip .codecs )
260
- if err != nil {
261
- return nil , err
262
- }
263
- listGVK := gvk .GroupVersion ().WithKind (gvk .Kind + "List" )
264
- listObj , err = ip .Scheme .New (listGVK )
265
- if err != nil {
266
- return nil , err
267
- }
219
+ // talk to the apiserver, and the list object that we'll use to describe our results.
220
+ client , listObj , err := ip .createClient (gvk , ip .codecs , ip .Scheme , ip .config )
221
+ if err != nil {
222
+ return nil , err
268
223
}
269
224
270
225
// Create a new ListWatch for the obj
@@ -282,3 +237,38 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured
282
237
},
283
238
}, nil
284
239
}
240
+
241
+ // createStructuredClient is a ClientCreatorFunc for use with structured
242
+ // objects (i.e. not Unstructured/UnstructuredList).
243
+ func createStructuredClient (gvk schema.GroupVersionKind ,
244
+ codecs serializer.CodecFactory ,
245
+ scheme * runtime.Scheme ,
246
+ baseConfig * rest.Config ) (rest.Interface , runtime.Object , error ) {
247
+
248
+ client , err := apiutil .RESTClientForGVK (gvk , baseConfig , codecs )
249
+ if err != nil {
250
+ return nil , nil , err
251
+ }
252
+ listGVK := gvk .GroupVersion ().WithKind (gvk .Kind + "List" )
253
+ listObj , err := scheme .New (listGVK )
254
+ if err != nil {
255
+ return nil , nil , err
256
+ }
257
+
258
+ return client , listObj , nil
259
+ }
260
+
261
+ // createUnstructuredClient is a ClientCreatorFunc for use with Unstructured and UnstructuredList.
262
+ func createUnstructuredClient (gvk schema.GroupVersionKind ,
263
+ _ serializer.CodecFactory ,
264
+ _ * runtime.Scheme ,
265
+ baseConfig * rest.Config ) (rest.Interface , runtime.Object , error ) {
266
+
267
+ listObj := & unstructured.UnstructuredList {}
268
+ client , err := apiutil .RESTUnstructuredClientForGVK (gvk , baseConfig )
269
+ if err != nil {
270
+ return nil , nil , err
271
+ }
272
+
273
+ return client , listObj , nil
274
+ }
0 commit comments