Skip to content

Commit 10ed9c2

Browse files
DirectXMan12Shawn Hurley
authored andcommitted
Adding ability to use dynamic list for unstructured list watcher.
1 parent 7744200 commit 10ed9c2

File tree

6 files changed

+236
-210
lines changed

6 files changed

+236
-210
lines changed

pkg/cache/internal/deleg_map.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/apimachinery/pkg/api/meta"
23+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
24+
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/runtime/schema"
26+
"k8s.io/client-go/rest"
27+
"k8s.io/client-go/tools/cache"
28+
)
29+
30+
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
31+
// It uses a standard parameter codec constructed based on the given generated Scheme.
32+
type InformersMap struct {
33+
// we abstract over the details of structured vs unstructured with the specificInformerMaps
34+
35+
structured *specificInformersMap
36+
unstructured *specificInformersMap
37+
38+
// Scheme maps runtime.Objects to GroupVersionKinds
39+
Scheme *runtime.Scheme
40+
}
41+
42+
// NewInformersMap creates a new InformersMap that can create informers for
43+
// both structured and unstructured objects.
44+
func NewInformersMap(config *rest.Config,
45+
scheme *runtime.Scheme,
46+
mapper meta.RESTMapper,
47+
resync time.Duration) *InformersMap {
48+
49+
return &InformersMap{
50+
structured: newStructuredInformersMap(config, scheme, mapper, resync),
51+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync),
52+
53+
Scheme: scheme,
54+
}
55+
}
56+
57+
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
58+
func (m *InformersMap) Start(stop <-chan struct{}) error {
59+
go m.structured.Start(stop)
60+
go m.unstructured.Start(stop)
61+
<-stop
62+
return nil
63+
}
64+
65+
// WaitForCacheSync waits until all the caches have been synced.
66+
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
67+
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...)
68+
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...)
69+
70+
return cache.WaitForCacheSync(stop, syncedFuncs...)
71+
}
72+
73+
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
74+
// the Informer from the map.
75+
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
76+
_, isUnstructured := obj.(*unstructured.Unstructured)
77+
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
78+
isUnstructured = isUnstructured || isUnstructuredList
79+
80+
if isUnstructured {
81+
return m.unstructured.Get(gvk, obj)
82+
}
83+
84+
return m.structured.Get(gvk, obj)
85+
}
86+
87+
// newStructuredInformersMap creates a new InformersMap for structured objects.
88+
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
89+
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
90+
}
91+
92+
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
93+
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
94+
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
95+
}

pkg/cache/internal/informers_map.go

Lines changed: 83 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,35 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2726
"k8s.io/apimachinery/pkg/runtime"
2827
"k8s.io/apimachinery/pkg/runtime/schema"
2928
"k8s.io/apimachinery/pkg/runtime/serializer"
3029
"k8s.io/apimachinery/pkg/watch"
30+
"k8s.io/client-go/dynamic"
3131
"k8s.io/client-go/rest"
3232
"k8s.io/client-go/tools/cache"
33+
3334
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3435
)
3536

36-
// NewInformersMap returns a new InformersMap
37-
func NewInformersMap(config *rest.Config,
37+
// clientListWatcherFunc knows how to create a ListWatcher
38+
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
39+
40+
// newSpecificInformersMap returns a new specificInformersMap (like
41+
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
42+
func newSpecificInformersMap(config *rest.Config,
3843
scheme *runtime.Scheme,
3944
mapper meta.RESTMapper,
40-
resync time.Duration) *InformersMap {
41-
ip := &InformersMap{
42-
config: config,
43-
Scheme: scheme,
44-
mapper: mapper,
45-
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
46-
unstructuredInformerByGVK: make(map[schema.GroupVersionKind]*MapEntry),
47-
codecs: serializer.NewCodecFactory(scheme),
48-
paramCodec: runtime.NewParameterCodec(scheme),
49-
resync: resync,
45+
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
46+
ip := &specificInformersMap{
47+
config: config,
48+
Scheme: scheme,
49+
mapper: mapper,
50+
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
51+
codecs: serializer.NewCodecFactory(scheme),
52+
paramCodec: runtime.NewParameterCodec(scheme),
53+
resync: resync,
54+
createListWatcher: createListWatcher,
5055
}
5156
return ip
5257
}
@@ -60,9 +65,9 @@ type MapEntry struct {
6065
Reader CacheReader
6166
}
6267

63-
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
64-
//It uses a standard parameter codec constructed based on the given generated Scheme.
65-
type InformersMap struct {
68+
// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
69+
// It uses a standard parameter codec constructed based on the given generated Scheme.
70+
type specificInformersMap struct {
6671
// Scheme maps runtime.Objects to GroupVersionKinds
6772
Scheme *runtime.Scheme
6873

@@ -75,10 +80,6 @@ type InformersMap struct {
7580
// informersByGVK is the cache of informers keyed by groupVersionKind
7681
informersByGVK map[schema.GroupVersionKind]*MapEntry
7782

78-
// unstructuredInformerByGVK is a cache of informers for unstructured types
79-
// keyed by groupVersionKind
80-
unstructuredInformerByGVK map[schema.GroupVersionKind]*MapEntry
81-
8283
// codecs is used to create a new REST client
8384
codecs serializer.CodecFactory
8485

@@ -93,20 +94,22 @@ type InformersMap struct {
9394

9495
// mu guards access to the map
9596
mu sync.RWMutex
96-
// mu guards access to the unstructured map
97-
unstructuredMu sync.RWMutex
9897

9998
// start is true if the informers have been started
10099
started bool
100+
101+
// createClient knows how to create a client and a list object,
102+
// and allows for abstracting over the particulars of structured vs
103+
// unstructured objects.
104+
createListWatcher createListWatcherFunc
101105
}
102106

103107
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
104-
func (ip *InformersMap) Start(stop <-chan struct{}) error {
108+
// It doesn't return start because it can't return an error, and it's not a runnable directly.
109+
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
105110
func() {
106111
ip.mu.Lock()
107-
ip.unstructuredMu.Lock()
108112
defer ip.mu.Unlock()
109-
defer ip.unstructuredMu.Unlock()
110113

111114
// Set the stop channel so it can be passed to informers that are added later
112115
ip.stop = stop
@@ -116,52 +119,31 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error {
116119
go informer.Informer.Run(stop)
117120
}
118121

119-
// Start each unstructured informer
120-
for _, informer := range ip.unstructuredInformerByGVK {
121-
go informer.Informer.Run(stop)
122-
}
123-
124122
// Set started to true so we immediately start any informers added later.
125123
ip.started = true
126124
}()
127125
<-stop
128-
return nil
129126
}
130127

131-
// WaitForCacheSync waits until all the caches have been synced
132-
func (ip *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
133-
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK)+len(ip.unstructuredInformerByGVK))
128+
// HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
129+
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
130+
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
134131
for _, informer := range ip.informersByGVK {
135132
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
136133
}
137-
for _, informer := range ip.unstructuredInformerByGVK {
138-
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
139-
}
140-
return cache.WaitForCacheSync(stop, syncedFuncs...)
141-
}
142-
143-
func (ip *InformersMap) getMapEntry(gvk schema.GroupVersionKind, isUnstructured bool) (*MapEntry, bool) {
144-
if isUnstructured {
145-
ip.unstructuredMu.RLock()
146-
defer ip.unstructuredMu.RUnlock()
147-
i, ok := ip.unstructuredInformerByGVK[gvk]
148-
return i, ok
149-
}
150-
ip.mu.RLock()
151-
defer ip.mu.RUnlock()
152-
i, ok := ip.informersByGVK[gvk]
153-
return i, ok
154-
134+
return syncedFuncs
155135
}
156136

157-
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
137+
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
158138
// the Informer from the map.
159-
func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
160-
_, isUnstructured := obj.(*unstructured.Unstructured)
161-
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
162-
isUnstructured = isUnstructured || isUnstructuredList
139+
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
163140
// Return the informer if it is found
164-
i, ok := ip.getMapEntry(gvk, isUnstructured)
141+
i, ok := func() (*MapEntry, bool) {
142+
ip.mu.RLock()
143+
defer ip.mu.RUnlock()
144+
i, ok := ip.informersByGVK[gvk]
145+
return i, ok
146+
}()
165147
if ok {
166148
return i, nil
167149
}
@@ -170,27 +152,21 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
170152
// need to be locked
171153
var sync bool
172154
i, err := func() (*MapEntry, error) {
173-
var ok bool
174-
var i *MapEntry
175-
// Check the caches to see if we already have an Informer. If we do, return the Informer.
155+
ip.mu.Lock()
156+
defer ip.mu.Unlock()
157+
158+
// Check the cache to see if we already have an Informer. If we do, return the Informer.
176159
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
177160
// so neither returned early, but the first one created it.
178-
if isUnstructured {
179-
ip.unstructuredMu.Lock()
180-
defer ip.unstructuredMu.Unlock()
181-
i, ok = ip.unstructuredInformerByGVK[gvk]
182-
} else {
183-
ip.mu.Lock()
184-
defer ip.mu.Unlock()
185-
i, ok = ip.informersByGVK[gvk]
186-
}
161+
var ok bool
162+
i, ok := ip.informersByGVK[gvk]
187163
if ok {
188164
return i, nil
189165
}
190166

191167
// Create a NewSharedIndexInformer and add it to the map.
192168
var lw *cache.ListWatch
193-
lw, err := ip.newListWatch(gvk, isUnstructured)
169+
lw, err := ip.createListWatcher(gvk, ip)
194170
if err != nil {
195171
return nil, err
196172
}
@@ -201,7 +177,7 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
201177
Informer: ni,
202178
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
203179
}
204-
ip.setMap(i, gvk, isUnstructured)
180+
ip.informersByGVK[gvk] = i
205181

206182
// Start the Informer if need by
207183
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
@@ -226,45 +202,23 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
226202
return i, err
227203
}
228204

229-
// setMap - helper function to decide which map to add to.
230-
func (ip *InformersMap) setMap(i *MapEntry, gvk schema.GroupVersionKind, isUnstructured bool) {
231-
if isUnstructured {
232-
ip.unstructuredInformerByGVK[gvk] = i
233-
} else {
234-
235-
ip.informersByGVK[gvk] = i
236-
}
237-
}
238-
239205
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
240-
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured bool) (*cache.ListWatch, error) {
206+
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
241207
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
242208
// groupVersionKind to the Resource API we will use.
243209
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
244210
if err != nil {
245211
return nil, err
246212
}
247213

248-
// Construct a RESTClient for the groupVersionKind that we will use to
249-
// talk to the apiserver.
250-
var client rest.Interface
251-
var listObj runtime.Object
252-
if isUnstructured {
253-
listObj = &unstructured.UnstructuredList{}
254-
client, err = apiutil.RESTUnstructuredClientForGVK(gvk, ip.config)
255-
if err != nil {
256-
return nil, err
257-
}
258-
} else {
259-
client, err = apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
260-
if err != nil {
261-
return nil, err
262-
}
263-
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
264-
listObj, err = ip.Scheme.New(listGVK)
265-
if err != nil {
266-
return nil, err
267-
}
214+
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
215+
if err != nil {
216+
return nil, err
217+
}
218+
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
219+
listObj, err := ip.Scheme.New(listGVK)
220+
if err != nil {
221+
return nil, err
268222
}
269223

270224
// Create a new ListWatch for the obj
@@ -282,3 +236,29 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured
282236
},
283237
}, nil
284238
}
239+
240+
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
241+
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
242+
// groupVersionKind to the Resource API we will use.
243+
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
244+
if err != nil {
245+
return nil, err
246+
}
247+
dynamicClient, err := dynamic.NewForConfig(ip.config)
248+
if err != nil {
249+
return nil, err
250+
}
251+
252+
// Create a new ListWatch for the obj
253+
return &cache.ListWatch{
254+
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
255+
return dynamicClient.Resource(mapping.Resource).List(opts)
256+
},
257+
// Setup the watch function
258+
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
259+
// Watch needs to be set to true separately
260+
opts.Watch = true
261+
return dynamicClient.Resource(mapping.Resource).Watch(opts)
262+
},
263+
}, nil
264+
}

0 commit comments

Comments
 (0)