Skip to content

Commit e54e115

Browse files
author
Shawn Hurley
committed
Adding ability for clients, cache and watcher to work with unstructured
* remove cache reader marshal/unmarshal * adding seperation between types and unstructured types in the cache
1 parent 419794c commit e54e115

File tree

4 files changed

+152
-49
lines changed

4 files changed

+152
-49
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
2829
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -38,13 +39,14 @@ func NewInformersMap(config *rest.Config,
3839
mapper meta.RESTMapper,
3940
resync time.Duration) *InformersMap {
4041
ip := &InformersMap{
41-
config: config,
42-
Scheme: scheme,
43-
mapper: mapper,
44-
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
45-
codecs: serializer.NewCodecFactory(scheme),
46-
paramCodec: runtime.NewParameterCodec(scheme),
47-
resync: resync,
42+
config: config,
43+
Scheme: scheme,
44+
mapper: mapper,
45+
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
46+
unstructuredInformerByGVK: make(map[schema.GroupVersionKind]*MapEntry),
47+
codecs: serializer.NewCodecFactory(scheme),
48+
paramCodec: runtime.NewParameterCodec(scheme),
49+
resync: resync,
4850
}
4951
return ip
5052
}
@@ -73,6 +75,10 @@ type InformersMap struct {
7375
// informersByGVK is the cache of informers keyed by groupVersionKind
7476
informersByGVK map[schema.GroupVersionKind]*MapEntry
7577

78+
// unstructuredInformerByGVK is a cache of informers for unstructured types
79+
// keyed by groupVersionKind
80+
unstructuredInformerByGVK map[schema.GroupVersionKind]*MapEntry
81+
7682
// codecs is used to create a new REST client
7783
codecs serializer.CodecFactory
7884

@@ -87,6 +93,8 @@ type InformersMap struct {
8793

8894
// mu guards access to the map
8995
mu sync.RWMutex
96+
// mu guards access to the unstructured map
97+
unstructuredMu sync.RWMutex
9098

9199
// start is true if the informers have been started
92100
started bool
@@ -96,7 +104,9 @@ type InformersMap struct {
96104
func (ip *InformersMap) Start(stop <-chan struct{}) error {
97105
func() {
98106
ip.mu.Lock()
107+
ip.unstructuredMu.Lock()
99108
defer ip.mu.Unlock()
109+
defer ip.unstructuredMu.Unlock()
100110

101111
// Set the stop channel so it can be passed to informers that are added later
102112
ip.stop = stop
@@ -106,6 +116,11 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error {
106116
go informer.Informer.Run(stop)
107117
}
108118

119+
// Start each unstructured informer
120+
for _, informer := range ip.unstructuredInformerByGVK {
121+
go informer.Informer.Run(stop)
122+
}
123+
109124
// Set started to true so we immediately start any informers added later.
110125
ip.started = true
111126
}()
@@ -115,23 +130,36 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error {
115130

116131
// WaitForCacheSync waits until all the caches have been synced
117132
func (ip *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
118-
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
133+
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK)+len(ip.unstructuredInformerByGVK))
119134
for _, informer := range ip.informersByGVK {
120135
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
121136
}
137+
for _, informer := range ip.unstructuredInformerByGVK {
138+
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
139+
}
122140
return cache.WaitForCacheSync(stop, syncedFuncs...)
123141
}
124142

143+
func (ip *InformersMap) getMapEntry(gvk schema.GroupVersionKind, isUnstructured bool) (*MapEntry, bool) {
144+
if isUnstructured {
145+
ip.unstructuredMu.RLock()
146+
defer ip.unstructuredMu.RUnlock()
147+
i, ok := ip.unstructuredInformerByGVK[gvk]
148+
return i, ok
149+
}
150+
ip.mu.RLock()
151+
defer ip.mu.RUnlock()
152+
i, ok := ip.informersByGVK[gvk]
153+
return i, ok
154+
155+
}
156+
125157
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
126158
// the Informer from the map.
127159
func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
160+
_, isUnstructured := obj.(*unstructured.Unstructured)
128161
// Return the informer if it is found
129-
i, ok := func() (*MapEntry, bool) {
130-
ip.mu.RLock()
131-
defer ip.mu.RUnlock()
132-
i, ok := ip.informersByGVK[gvk]
133-
return i, ok
134-
}()
162+
i, ok := ip.getMapEntry(gvk, isUnstructured)
135163
if ok {
136164
return i, nil
137165
}
@@ -140,21 +168,27 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
140168
// need to be locked
141169
var sync bool
142170
i, err := func() (*MapEntry, error) {
143-
ip.mu.Lock()
144-
defer ip.mu.Unlock()
145-
146-
// Check the cache to see if we already have an Informer. If we do, return the Informer.
171+
var ok bool
172+
var i *MapEntry
173+
// Check the caches to see if we already have an Informer. If we do, return the Informer.
147174
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
148175
// so neither returned early, but the first one created it.
149-
var ok bool
150-
i, ok := ip.informersByGVK[gvk]
176+
if isUnstructured {
177+
ip.unstructuredMu.Lock()
178+
defer ip.unstructuredMu.Unlock()
179+
i, ok = ip.unstructuredInformerByGVK[gvk]
180+
} else {
181+
ip.mu.Lock()
182+
defer ip.mu.Unlock()
183+
i, ok = ip.informersByGVK[gvk]
184+
}
151185
if ok {
152186
return i, nil
153187
}
154188

155189
// Create a NewSharedIndexInformer and add it to the map.
156190
var lw *cache.ListWatch
157-
lw, err := ip.newListWatch(gvk)
191+
lw, err := ip.newListWatch(gvk, isUnstructured)
158192
if err != nil {
159193
return nil, err
160194
}
@@ -191,26 +225,34 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
191225
}
192226

193227
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
194-
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWatch, error) {
195-
// Construct a RESTClient for the groupVersionKind that we will use to
196-
// talk to the apiserver.
197-
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
198-
if err != nil {
199-
return nil, err
200-
}
201-
228+
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured bool) (*cache.ListWatch, error) {
202229
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
203230
// groupVersionKind to the Resource API we will use.
204231
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
205232
if err != nil {
206233
return nil, err
207234
}
208235

209-
// Get a listObject for listing that the ListWatch can DeepCopy
210-
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
211-
listObj, err := ip.Scheme.New(listGVK)
212-
if err != nil {
213-
return nil, err
236+
// Construct a RESTClient for the groupVersionKind that we will use to
237+
// talk to the apiserver.
238+
var client rest.Interface
239+
var listObj runtime.Object
240+
if isUnstructured {
241+
listObj = &unstructured.UnstructuredList{}
242+
client, err = apiutil.RESTUnstructuredClientForGVK(gvk, ip.config)
243+
if err != nil {
244+
return nil, err
245+
}
246+
} else {
247+
client, err = apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
248+
if err != nil {
249+
return nil, err
250+
}
251+
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
252+
listObj, err = ip.Scheme.New(listGVK)
253+
if err != nil {
254+
return nil, err
255+
}
214256
}
215257

216258
// Create a new ListWatch for the obj

pkg/client/apiutil/apimachinery.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ import (
2020
"fmt"
2121

2222
"k8s.io/apimachinery/pkg/api/meta"
23+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2324
"k8s.io/apimachinery/pkg/runtime"
2425
"k8s.io/apimachinery/pkg/runtime/schema"
2526
"k8s.io/apimachinery/pkg/runtime/serializer"
2627
"k8s.io/client-go/discovery"
28+
"k8s.io/client-go/kubernetes/scheme"
2729
"k8s.io/client-go/rest"
2830
"k8s.io/client-go/restmapper"
2931
)
@@ -65,6 +67,29 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
6567
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
6668
// with the given GroupVersionKind.
6769
func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
70+
cfg := createRestConfig(gvk, baseConfig)
71+
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: codecs}
72+
return rest.RESTClientFor(cfg)
73+
}
74+
75+
// RESTUnstructuredClientForGVK constructs a new rest.Interface for accessing unstructured resources.
76+
func RESTUnstructuredClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config) (rest.Interface, error) {
77+
cfg := createRestConfig(gvk, baseConfig)
78+
var jsonInfo runtime.SerializerInfo
79+
for _, info := range scheme.Codecs.SupportedMediaTypes() {
80+
if info.MediaType == runtime.ContentTypeJSON {
81+
jsonInfo = info
82+
break
83+
}
84+
}
85+
jsonInfo.Serializer = unstructured.UnstructuredJSONScheme
86+
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(jsonInfo)
87+
88+
return rest.RESTClientFor(cfg)
89+
}
90+
91+
//createRestConfig copies the base config and updates needed fields for a new rest config
92+
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
6893
gv := gvk.GroupVersion()
6994

7095
cfg := rest.CopyConfig(baseConfig)
@@ -74,9 +99,9 @@ func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, code
7499
} else {
75100
cfg.APIPath = "/apis"
76101
}
77-
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: codecs}
78102
if cfg.UserAgent == "" {
79103
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
80104
}
81-
return rest.RESTClientFor(cfg)
105+
return cfg
106+
82107
}

pkg/client/client.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
2627
"k8s.io/apimachinery/pkg/runtime/serializer"
2728
"k8s.io/client-go/kubernetes/scheme"
2829
"k8s.io/client-go/rest"
@@ -60,11 +61,12 @@ func New(config *rest.Config, options Options) (Client, error) {
6061

6162
c := &client{
6263
cache: clientCache{
63-
config: config,
64-
scheme: options.Scheme,
65-
mapper: options.Mapper,
66-
codecs: serializer.NewCodecFactory(options.Scheme),
67-
resourceByType: make(map[reflect.Type]*resourceMeta),
64+
config: config,
65+
scheme: options.Scheme,
66+
mapper: options.Mapper,
67+
codecs: serializer.NewCodecFactory(options.Scheme),
68+
resourceByType: make(map[reflect.Type]*resourceMeta),
69+
unstructuredResourceByGVK: make(map[schema.GroupVersionKind]*resourceMeta),
6870
},
6971
paramCodec: runtime.NewParameterCodec(options.Scheme),
7072
}

pkg/client/client_cache.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
2829
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -46,7 +47,10 @@ type clientCache struct {
4647

4748
// resourceByType caches type metadata
4849
resourceByType map[reflect.Type]*resourceMeta
49-
mu sync.RWMutex
50+
// resourceByGVK caches type metadata for unstructured
51+
unstructuredResourceByGVK map[schema.GroupVersionKind]*resourceMeta
52+
muByType sync.RWMutex
53+
muByGVK sync.RWMutex
5054
}
5155

5256
// newResource maps obj to a Kubernetes Resource and constructs a client for that Resource.
@@ -73,24 +77,44 @@ func (c *clientCache) newResource(obj runtime.Object) (*resourceMeta, error) {
7377
return &resourceMeta{Interface: client, mapping: mapping, gvk: gvk}, nil
7478
}
7579

76-
// getResource returns the resource meta information for the given type of object.
77-
// If the object is a list, the resource represents the item's type instead.
78-
func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
80+
func (c *clientCache) getUnstructuredResourceByGVK(obj runtime.Object) (*resourceMeta, error) {
81+
// It's better to do creation work twice than to not let multiple
82+
// people make requests at once
83+
c.muByGVK.RLock()
84+
r, known := c.unstructuredResourceByGVK[obj.GetObjectKind().GroupVersionKind()]
85+
c.muByGVK.RUnlock()
86+
87+
if known {
88+
return r, nil
89+
}
90+
91+
// Initialize a new Client
92+
c.muByGVK.Lock()
93+
defer c.muByGVK.Unlock()
94+
r, err := c.newResource(obj)
95+
if err != nil {
96+
return nil, err
97+
}
98+
c.unstructuredResourceByGVK[obj.GetObjectKind().GroupVersionKind()] = r
99+
return r, err
100+
}
101+
102+
func (c *clientCache) getResourceByType(obj runtime.Object) (*resourceMeta, error) {
79103
typ := reflect.TypeOf(obj)
80104

81105
// It's better to do creation work twice than to not let multiple
82106
// people make requests at once
83-
c.mu.RLock()
107+
c.muByType.RLock()
84108
r, known := c.resourceByType[typ]
85-
c.mu.RUnlock()
109+
c.muByType.RUnlock()
86110

87111
if known {
88112
return r, nil
89113
}
90114

91115
// Initialize a new Client
92-
c.mu.Lock()
93-
defer c.mu.Unlock()
116+
c.muByType.Lock()
117+
defer c.muByType.Unlock()
94118
r, err := c.newResource(obj)
95119
if err != nil {
96120
return nil, err
@@ -99,6 +123,16 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
99123
return r, err
100124
}
101125

126+
// getResource returns the resource meta information for the given type of object.
127+
// If the object is a list, the resource represents the item's type instead.
128+
func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
129+
_, isUnstructured := obj.(*unstructured.Unstructured)
130+
if isUnstructured {
131+
return c.getUnstructuredResourceByGVK(obj)
132+
}
133+
return c.getResourceByType(obj)
134+
}
135+
102136
// getObjMeta returns objMeta containing both type and object metadata and state
103137
func (c *clientCache) getObjMeta(obj runtime.Object) (*objMeta, error) {
104138
r, err := c.getResource(obj)

0 commit comments

Comments
 (0)