@@ -30,23 +30,29 @@ import (
30
30
"k8s.io/apimachinery/pkg/watch"
31
31
"k8s.io/client-go/rest"
32
32
"k8s.io/client-go/tools/cache"
33
+
33
34
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
34
35
)
35
36
36
- // NewInformersMap returns a new InformersMap
37
- func NewInformersMap (config * rest.Config ,
37
+ // clientCreatorFunc knows how to create a client and the corresponding list object that it should
38
+ // deserialize into for any given group-version-kind.
39
+ type clientCreatorFunc func (gvk schema.GroupVersionKind , codecs serializer.CodecFactory , scheme * runtime.Scheme , baseConfig * rest.Config ) (client rest.Interface , listObj runtime.Object , err error )
40
+
41
+ // newSpecificInformersMap returns a new specificInformersMap (like
42
+ // the generical InformersMap, except that it doesn't implement WaitForCacheSync).
43
+ func newSpecificInformersMap (config * rest.Config ,
38
44
scheme * runtime.Scheme ,
39
45
mapper meta.RESTMapper ,
40
- resync time.Duration ) * InformersMap {
41
- ip := & InformersMap {
42
- config : config ,
43
- Scheme : scheme ,
44
- mapper : mapper ,
45
- informersByGVK : make (map [schema.GroupVersionKind ]* MapEntry ),
46
- unstructuredInformerByGVK : make ( map [schema. GroupVersionKind ] * MapEntry ),
47
- codecs : serializer . NewCodecFactory (scheme ),
48
- paramCodec : runtime . NewParameterCodec ( scheme ) ,
49
- resync : resync ,
46
+ resync time.Duration , createClient clientCreatorFunc ) * specificInformersMap {
47
+ ip := & specificInformersMap {
48
+ config : config ,
49
+ Scheme : scheme ,
50
+ mapper : mapper ,
51
+ informersByGVK : make (map [schema.GroupVersionKind ]* MapEntry ),
52
+ codecs : serializer . NewCodecFactory ( scheme ),
53
+ paramCodec : runtime . NewParameterCodec (scheme ),
54
+ resync : resync ,
55
+ createClient : createClient ,
50
56
}
51
57
return ip
52
58
}
@@ -60,9 +66,9 @@ type MapEntry struct {
60
66
Reader CacheReader
61
67
}
62
68
63
- // InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
64
- //It uses a standard parameter codec constructed based on the given generated Scheme.
65
- type InformersMap struct {
69
+ // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
70
+ // It uses a standard parameter codec constructed based on the given generated Scheme.
71
+ type specificInformersMap struct {
66
72
// Scheme maps runtime.Objects to GroupVersionKinds
67
73
Scheme * runtime.Scheme
68
74
@@ -75,10 +81,6 @@ type InformersMap struct {
75
81
// informersByGVK is the cache of informers keyed by groupVersionKind
76
82
informersByGVK map [schema.GroupVersionKind ]* MapEntry
77
83
78
- // unstructuredInformerByGVK is a cache of informers for unstructured types
79
- // keyed by groupVersionKind
80
- unstructuredInformerByGVK map [schema.GroupVersionKind ]* MapEntry
81
-
82
84
// codecs is used to create a new REST client
83
85
codecs serializer.CodecFactory
84
86
@@ -93,20 +95,22 @@ type InformersMap struct {
93
95
94
96
// mu guards access to the map
95
97
mu sync.RWMutex
96
- // mu guards access to the unstructured map
97
- unstructuredMu sync.RWMutex
98
98
99
99
// start is true if the informers have been started
100
100
started bool
101
+
102
+ // createClient knows how to create a client and a list object,
103
+ // and allows for abstracting over the particulars of structured vs
104
+ // unstructured objects.
105
+ createClient clientCreatorFunc
101
106
}
102
107
103
108
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
104
- func (ip * InformersMap ) Start (stop <- chan struct {}) error {
109
+ // It doesn't return start because it can't return an error, and it's not a runnable directly.
110
+ func (ip * specificInformersMap ) Start (stop <- chan struct {}) {
105
111
func () {
106
112
ip .mu .Lock ()
107
- ip .unstructuredMu .Lock ()
108
113
defer ip .mu .Unlock ()
109
- defer ip .unstructuredMu .Unlock ()
110
114
111
115
// Set the stop channel so it can be passed to informers that are added later
112
116
ip .stop = stop
@@ -116,52 +120,31 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error {
116
120
go informer .Informer .Run (stop )
117
121
}
118
122
119
- // Start each unstructured informer
120
- for _ , informer := range ip .unstructuredInformerByGVK {
121
- go informer .Informer .Run (stop )
122
- }
123
-
124
123
// Set started to true so we immediately start any informers added later.
125
124
ip .started = true
126
125
}()
127
126
<- stop
128
- return nil
129
127
}
130
128
131
- // WaitForCacheSync waits until all the caches have been synced
132
- func (ip * InformersMap ) WaitForCacheSync ( stop <- chan struct {}) bool {
133
- syncedFuncs := make ([]cache.InformerSynced , 0 , len (ip .informersByGVK )+ len ( ip . unstructuredInformerByGVK ) )
129
+ // HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
130
+ func (ip * specificInformersMap ) HasSyncedFuncs () []cache. InformerSynced {
131
+ syncedFuncs := make ([]cache.InformerSynced , 0 , len (ip .informersByGVK ))
134
132
for _ , informer := range ip .informersByGVK {
135
133
syncedFuncs = append (syncedFuncs , informer .Informer .HasSynced )
136
134
}
137
- for _ , informer := range ip .unstructuredInformerByGVK {
138
- syncedFuncs = append (syncedFuncs , informer .Informer .HasSynced )
139
- }
140
- return cache .WaitForCacheSync (stop , syncedFuncs ... )
141
- }
142
-
143
- func (ip * InformersMap ) getMapEntry (gvk schema.GroupVersionKind , isUnstructured bool ) (* MapEntry , bool ) {
144
- if isUnstructured {
145
- ip .unstructuredMu .RLock ()
146
- defer ip .unstructuredMu .RUnlock ()
147
- i , ok := ip .unstructuredInformerByGVK [gvk ]
148
- return i , ok
149
- }
150
- ip .mu .RLock ()
151
- defer ip .mu .RUnlock ()
152
- i , ok := ip .informersByGVK [gvk ]
153
- return i , ok
154
-
135
+ return syncedFuncs
155
136
}
156
137
157
- // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
138
+ // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
158
139
// the Informer from the map.
159
- func (ip * InformersMap ) Get (gvk schema.GroupVersionKind , obj runtime.Object ) (* MapEntry , error ) {
160
- _ , isUnstructured := obj .(* unstructured.Unstructured )
161
- _ , isUnstructuredList := obj .(* unstructured.UnstructuredList )
162
- isUnstructured = isUnstructured || isUnstructuredList
140
+ func (ip * specificInformersMap ) Get (gvk schema.GroupVersionKind , obj runtime.Object ) (* MapEntry , error ) {
163
141
// Return the informer if it is found
164
- i , ok := ip .getMapEntry (gvk , isUnstructured )
142
+ i , ok := func () (* MapEntry , bool ) {
143
+ ip .mu .RLock ()
144
+ defer ip .mu .RUnlock ()
145
+ i , ok := ip .informersByGVK [gvk ]
146
+ return i , ok
147
+ }()
165
148
if ok {
166
149
return i , nil
167
150
}
@@ -170,27 +153,21 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
170
153
// need to be locked
171
154
var sync bool
172
155
i , err := func () (* MapEntry , error ) {
173
- var ok bool
174
- var i * MapEntry
175
- // Check the caches to see if we already have an Informer. If we do, return the Informer.
156
+ ip .mu .Lock ()
157
+ defer ip .mu .Unlock ()
158
+
159
+ // Check the cache to see if we already have an Informer. If we do, return the Informer.
176
160
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
177
161
// so neither returned early, but the first one created it.
178
- if isUnstructured {
179
- ip .unstructuredMu .Lock ()
180
- defer ip .unstructuredMu .Unlock ()
181
- i , ok = ip .unstructuredInformerByGVK [gvk ]
182
- } else {
183
- ip .mu .Lock ()
184
- defer ip .mu .Unlock ()
185
- i , ok = ip .informersByGVK [gvk ]
186
- }
162
+ var ok bool
163
+ i , ok := ip .informersByGVK [gvk ]
187
164
if ok {
188
165
return i , nil
189
166
}
190
167
191
168
// Create a NewSharedIndexInformer and add it to the map.
192
169
var lw * cache.ListWatch
193
- lw , err := ip .newListWatch (gvk , isUnstructured )
170
+ lw , err := ip .newListWatch (gvk )
194
171
if err != nil {
195
172
return nil , err
196
173
}
@@ -201,7 +178,7 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
201
178
Informer : ni ,
202
179
Reader : CacheReader {indexer : ni .GetIndexer (), groupVersionKind : gvk },
203
180
}
204
- ip .setMap ( i , gvk , isUnstructured )
181
+ ip .informersByGVK [ gvk ] = i
205
182
206
183
// Start the Informer if need by
207
184
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
@@ -226,18 +203,8 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
226
203
return i , err
227
204
}
228
205
229
- // setMap - helper function to decide which map to add to.
230
- func (ip * InformersMap ) setMap (i * MapEntry , gvk schema.GroupVersionKind , isUnstructured bool ) {
231
- if isUnstructured {
232
- ip .unstructuredInformerByGVK [gvk ] = i
233
- } else {
234
-
235
- ip .informersByGVK [gvk ] = i
236
- }
237
- }
238
-
239
206
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
240
- func (ip * InformersMap ) newListWatch (gvk schema.GroupVersionKind , isUnstructured bool ) (* cache.ListWatch , error ) {
207
+ func (ip * specificInformersMap ) newListWatch (gvk schema.GroupVersionKind ) (* cache.ListWatch , error ) {
241
208
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
242
209
// groupVersionKind to the Resource API we will use.
243
210
mapping , err := ip .mapper .RESTMapping (gvk .GroupKind (), gvk .Version )
@@ -246,25 +213,10 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured
246
213
}
247
214
248
215
// Construct a RESTClient for the groupVersionKind that we will use to
249
- // talk to the apiserver.
250
- var client rest.Interface
251
- var listObj runtime.Object
252
- if isUnstructured {
253
- listObj = & unstructured.UnstructuredList {}
254
- client , err = apiutil .RESTUnstructuredClientForGVK (gvk , ip .config )
255
- if err != nil {
256
- return nil , err
257
- }
258
- } else {
259
- client , err = apiutil .RESTClientForGVK (gvk , ip .config , ip .codecs )
260
- if err != nil {
261
- return nil , err
262
- }
263
- listGVK := gvk .GroupVersion ().WithKind (gvk .Kind + "List" )
264
- listObj , err = ip .Scheme .New (listGVK )
265
- if err != nil {
266
- return nil , err
267
- }
216
+ // talk to the apiserver, and the list object that we'll use to describe our results.
217
+ client , listObj , err := ip .createClient (gvk , ip .codecs , ip .Scheme , ip .config )
218
+ if err != nil {
219
+ return nil , err
268
220
}
269
221
270
222
// Create a new ListWatch for the obj
@@ -282,3 +234,30 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured
282
234
},
283
235
}, nil
284
236
}
237
+
238
+ // createUnstructuredClient is a ClientCreatorFunc for use with structured
239
+ // objects (i.e. not Unstructured/UnstructuredList).
240
+ func createStructuredClient (gvk schema.GroupVersionKind , codecs serializer.CodecFactory , scheme * runtime.Scheme , baseConfig * rest.Config ) (rest.Interface , runtime.Object , error ) {
241
+ client , err := apiutil .RESTClientForGVK (gvk , baseConfig , codecs )
242
+ if err != nil {
243
+ return nil , nil , err
244
+ }
245
+ listGVK := gvk .GroupVersion ().WithKind (gvk .Kind + "List" )
246
+ listObj , err := scheme .New (listGVK )
247
+ if err != nil {
248
+ return nil , nil , err
249
+ }
250
+
251
+ return client , listObj , nil
252
+ }
253
+
254
+ // createUnstructuredClient is a ClientCreatorFunc for use with Unstructured and UnstructuredList.
255
+ func createUnstructuredClient (gvk schema.GroupVersionKind , _ serializer.CodecFactory , _ * runtime.Scheme , baseConfig * rest.Config ) (rest.Interface , runtime.Object , error ) {
256
+ listObj := & unstructured.UnstructuredList {}
257
+ client , err := apiutil .RESTUnstructuredClientForGVK (gvk , baseConfig )
258
+ if err != nil {
259
+ return nil , nil , err
260
+ }
261
+
262
+ return client , listObj , nil
263
+ }
0 commit comments