Skip to content

Commit 3127304

Browse files
author
Shawn Hurley
committed
Adding ability for clients, cache and watcher to work with unstructured
1 parent 3aaf8cd commit 3127304

File tree

5 files changed

+102
-11
lines changed

5 files changed

+102
-11
lines changed

pkg/cache/internal/cache_reader.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ package internal
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
2223
"reflect"
2324

2425
"k8s.io/apimachinery/pkg/api/errors"
2526
apimeta "k8s.io/apimachinery/pkg/api/meta"
27+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2628
"k8s.io/apimachinery/pkg/fields"
2729
"k8s.io/apimachinery/pkg/labels"
2830
"k8s.io/apimachinery/pkg/runtime"
@@ -73,6 +75,21 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.O
7375
// TODO(directxman12): revisit the decision to always deepcopy
7476
obj = obj.(runtime.Object).DeepCopyObject()
7577

78+
// If out is supposed to be unstructured handle correctly.
79+
if o, ok := out.(*unstructured.Unstructured); ok {
80+
//Encode the obj to a map[string]interface and et the out.Object
81+
b, err := json.Marshal(obj)
82+
if err != nil {
83+
return err
84+
}
85+
err = json.Unmarshal(b, o)
86+
if err != nil {
87+
return err
88+
}
89+
o.SetGroupVersionKind(c.groupVersionKind)
90+
return nil
91+
}
92+
7693
// Copy the value of the item in the cache to the returned value
7794
// TODO(directxman12): this is a terrible hack, pls fix (we should have deepcopyinto)
7895
outVal := reflect.ValueOf(out)

pkg/cache/internal/informers_map.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
2829
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -151,10 +152,11 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
151152
if ok {
152153
return i, nil
153154
}
155+
_, isUnstructured := obj.(*unstructured.Unstructured)
154156

155157
// Create a NewSharedIndexInformer and add it to the map.
156158
var lw *cache.ListWatch
157-
lw, err := ip.newListWatch(gvk)
159+
lw, err := ip.newListWatch(gvk, isUnstructured)
158160
if err != nil {
159161
return nil, err
160162
}
@@ -191,10 +193,17 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
191193
}
192194

193195
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
194-
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWatch, error) {
196+
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind, isUnstructured bool) (*cache.ListWatch, error) {
195197
// Construct a RESTClient for the groupVersionKind that we will use to
196198
// talk to the apiserver.
197-
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
199+
var client rest.Interface
200+
var err error
201+
if isUnstructured {
202+
client, err = apiutil.RESTUnstructuredClientForGVK(gvk, ip.config)
203+
} else {
204+
client, err = apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
205+
206+
}
198207
if err != nil {
199208
return nil, err
200209
}

pkg/client/apiutil/apimachinery.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"fmt"
55

66
"k8s.io/apimachinery/pkg/api/meta"
7+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
78
"k8s.io/apimachinery/pkg/runtime"
89
"k8s.io/apimachinery/pkg/runtime/schema"
910
"k8s.io/apimachinery/pkg/runtime/serializer"
1011
"k8s.io/client-go/discovery"
1112
"k8s.io/client-go/dynamic"
13+
"k8s.io/client-go/kubernetes/scheme"
1214
"k8s.io/client-go/rest"
1315
)
1416

@@ -64,3 +66,30 @@ func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, code
6466
}
6567
return rest.RESTClientFor(cfg)
6668
}
69+
70+
// RESTUnstructuredClientForGVK constructs a new rest.Interface for accessing unstructured resources.
71+
func RESTUnstructuredClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config) (rest.Interface, error) {
72+
gv := gvk.GroupVersion()
73+
74+
cfg := rest.CopyConfig(baseConfig)
75+
cfg.GroupVersion = &gv
76+
if gvk.Group == "" {
77+
cfg.APIPath = "/api"
78+
} else {
79+
cfg.APIPath = "/apis"
80+
}
81+
var jsonInfo runtime.SerializerInfo
82+
for _, info := range scheme.Codecs.SupportedMediaTypes() {
83+
if info.MediaType == runtime.ContentTypeJSON {
84+
jsonInfo = info
85+
break
86+
}
87+
}
88+
jsonInfo.Serializer = unstructured.UnstructuredJSONScheme
89+
90+
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(jsonInfo)
91+
if cfg.UserAgent == "" {
92+
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
93+
}
94+
return rest.RESTClientFor(cfg)
95+
}

pkg/client/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
2627
"k8s.io/apimachinery/pkg/runtime/serializer"
2728
"k8s.io/client-go/kubernetes/scheme"
2829
"k8s.io/client-go/rest"
@@ -65,6 +66,7 @@ func New(config *rest.Config, options Options) (Client, error) {
6566
mapper: options.Mapper,
6667
codecs: serializer.NewCodecFactory(options.Scheme),
6768
resourceByType: make(map[reflect.Type]*resourceMeta),
69+
resourceByGVK: make(map[schema.GroupVersionKind]*resourceMeta),
6870
},
6971
paramCodec: runtime.NewParameterCodec(options.Scheme),
7072
}

pkg/client/client_cache.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
2829
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -46,7 +47,10 @@ type clientCache struct {
4647

4748
// resourceByType caches type metadata
4849
resourceByType map[reflect.Type]*resourceMeta
49-
mu sync.RWMutex
50+
// resourceByGVK caches type metadat for unstructured
51+
resourceByGVK map[schema.GroupVersionKind]*resourceMeta
52+
muByType sync.RWMutex
53+
muByGVK sync.RWMutex
5054
}
5155

5256
// newResource maps obj to a Kubernetes Resource and constructs a client for that Resource.
@@ -73,24 +77,44 @@ func (c *clientCache) newResource(obj runtime.Object) (*resourceMeta, error) {
7377
return &resourceMeta{Interface: client, mapping: mapping, gvk: gvk}, nil
7478
}
7579

76-
// getResource returns the resource meta information for the given type of object.
77-
// If the object is a list, the resource represents the item's type instead.
78-
func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
80+
func (c *clientCache) handleResourceByGVK(obj runtime.Object) (*resourceMeta, error) {
81+
// It's better to do creation work twice than to not let multiple
82+
// people make requests at once
83+
c.muByGVK.RLock()
84+
r, known := c.resourceByGVK[obj.GetObjectKind().GroupVersionKind()]
85+
c.muByGVK.RUnlock()
86+
87+
if known {
88+
return r, nil
89+
}
90+
91+
// Initialize a new Client
92+
c.muByGVK.Lock()
93+
defer c.muByGVK.Unlock()
94+
r, err := c.newResource(obj)
95+
if err != nil {
96+
return nil, err
97+
}
98+
c.resourceByGVK[obj.GetObjectKind().GroupVersionKind()] = r
99+
return r, err
100+
}
101+
102+
func (c *clientCache) handleResourceByType(obj runtime.Object) (*resourceMeta, error) {
79103
typ := reflect.TypeOf(obj)
80104

81105
// It's better to do creation work twice than to not let multiple
82106
// people make requests at once
83-
c.mu.RLock()
107+
c.muByType.RLock()
84108
r, known := c.resourceByType[typ]
85-
c.mu.RUnlock()
109+
c.muByType.RUnlock()
86110

87111
if known {
88112
return r, nil
89113
}
90114

91115
// Initialize a new Client
92-
c.mu.Lock()
93-
defer c.mu.Unlock()
116+
c.muByType.Lock()
117+
defer c.muByType.Unlock()
94118
r, err := c.newResource(obj)
95119
if err != nil {
96120
return nil, err
@@ -99,6 +123,16 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
99123
return r, err
100124
}
101125

126+
// getResource returns the resource meta information for the given type of object.
127+
// If the object is a list, the resource represents the item's type instead.
128+
func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
129+
_, isUnstructured := obj.(*unstructured.Unstructured)
130+
if isUnstructured {
131+
return c.handleResourceByGVK(obj)
132+
}
133+
return c.handleResourceByType(obj)
134+
}
135+
102136
// getObjMeta returns objMeta containing both type and object metadata and state
103137
func (c *clientCache) getObjMeta(obj runtime.Object) (*objMeta, error) {
104138
r, err := c.getResource(obj)

0 commit comments

Comments
 (0)