Skip to content

Commit df7c11e

Browse files
authored
Merge pull request kubernetes-sigs#101 from shawn-hurley/unstructured
Adding ability for clients, cache and watcher to work with unstructured
2 parents 5373e8e + 7471369 commit df7c11e

File tree

11 files changed

+2518
-991
lines changed

11 files changed

+2518
-991
lines changed

pkg/cache/cache_test.go

Lines changed: 460 additions & 203 deletions
Large diffs are not rendered by default.

pkg/cache/informer_cache.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strings"
2424

2525
apimeta "k8s.io/apimachinery/pkg/api/meta"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
2829
"k8s.io/client-go/tools/cache"
@@ -58,11 +59,6 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
5859

5960
// List implements Reader
6061
func (ip *informerCache) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
61-
itemsPtr, err := apimeta.GetItemsPtr(out)
62-
if err != nil {
63-
return nil
64-
}
65-
6662
gvk, err := apiutil.GVKForObject(out, ip.Scheme)
6763
if err != nil {
6864
return err
@@ -73,13 +69,25 @@ func (ip *informerCache) List(ctx context.Context, opts *client.ListOptions, out
7369
}
7470
// we need the non-list GVK, so chop off the "List" from the end of the kind
7571
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
76-
77-
// http://knowyourmeme.com/memes/this-is-fine
78-
elemType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem()
79-
cacheTypeValue := reflect.Zero(reflect.PtrTo(elemType))
80-
cacheTypeObj, ok := cacheTypeValue.Interface().(runtime.Object)
81-
if !ok {
82-
return fmt.Errorf("cannot get cache for %T, its element %T is not a runtime.Object", out, cacheTypeValue.Interface())
72+
_, isUnstructured := out.(*unstructured.UnstructuredList)
73+
var cacheTypeObj runtime.Object
74+
if isUnstructured {
75+
u := &unstructured.Unstructured{}
76+
u.SetGroupVersionKind(gvk)
77+
cacheTypeObj = u
78+
} else {
79+
itemsPtr, err := apimeta.GetItemsPtr(out)
80+
if err != nil {
81+
return nil
82+
}
83+
// http://knowyourmeme.com/memes/this-is-fine
84+
elemType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem()
85+
cacheTypeValue := reflect.Zero(reflect.PtrTo(elemType))
86+
var ok bool
87+
cacheTypeObj, ok = cacheTypeValue.Interface().(runtime.Object)
88+
if !ok {
89+
return fmt.Errorf("cannot get cache for %T, its element %T is not a runtime.Object", out, cacheTypeValue.Interface())
90+
}
8391
}
8492

8593
cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)

pkg/cache/internal/deleg_map.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/apimachinery/pkg/api/meta"
23+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
24+
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/runtime/schema"
26+
"k8s.io/client-go/rest"
27+
"k8s.io/client-go/tools/cache"
28+
)
29+
30+
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
31+
// It uses a standard parameter codec constructed based on the given generated Scheme.
32+
type InformersMap struct {
33+
// we abstract over the details of structured vs unstructured with the specificInformerMaps
34+
35+
structured *specificInformersMap
36+
unstructured *specificInformersMap
37+
38+
// Scheme maps runtime.Objects to GroupVersionKinds
39+
Scheme *runtime.Scheme
40+
}
41+
42+
// NewInformersMap creates a new InformersMap that can create informers for
43+
// both structured and unstructured objects.
44+
func NewInformersMap(config *rest.Config,
45+
scheme *runtime.Scheme,
46+
mapper meta.RESTMapper,
47+
resync time.Duration) *InformersMap {
48+
49+
return &InformersMap{
50+
structured: newStructuredInformersMap(config, scheme, mapper, resync),
51+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync),
52+
53+
Scheme: scheme,
54+
}
55+
}
56+
57+
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
58+
func (m *InformersMap) Start(stop <-chan struct{}) error {
59+
go m.structured.Start(stop)
60+
go m.unstructured.Start(stop)
61+
<-stop
62+
return nil
63+
}
64+
65+
// WaitForCacheSync waits until all the caches have been synced.
66+
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
67+
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...)
68+
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...)
69+
70+
return cache.WaitForCacheSync(stop, syncedFuncs...)
71+
}
72+
73+
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
74+
// the Informer from the map.
75+
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
76+
_, isUnstructured := obj.(*unstructured.Unstructured)
77+
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
78+
isUnstructured = isUnstructured || isUnstructuredList
79+
80+
if isUnstructured {
81+
return m.unstructured.Get(gvk, obj)
82+
}
83+
84+
return m.structured.Get(gvk, obj)
85+
}
86+
87+
// newStructuredInformersMap creates a new InformersMap for structured objects.
88+
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
89+
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
90+
}
91+
92+
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
93+
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
94+
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
95+
}

pkg/cache/internal/informers_map.go

Lines changed: 65 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,31 @@ import (
2727
"k8s.io/apimachinery/pkg/runtime/schema"
2828
"k8s.io/apimachinery/pkg/runtime/serializer"
2929
"k8s.io/apimachinery/pkg/watch"
30+
"k8s.io/client-go/dynamic"
3031
"k8s.io/client-go/rest"
3132
"k8s.io/client-go/tools/cache"
33+
3234
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3335
)
3436

35-
// NewInformersMap returns a new InformersMap
36-
func NewInformersMap(config *rest.Config,
37+
// clientListWatcherFunc knows how to create a ListWatcher
38+
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
39+
40+
// newSpecificInformersMap returns a new specificInformersMap (like
41+
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
42+
func newSpecificInformersMap(config *rest.Config,
3743
scheme *runtime.Scheme,
3844
mapper meta.RESTMapper,
39-
resync time.Duration) *InformersMap {
40-
ip := &InformersMap{
41-
config: config,
42-
Scheme: scheme,
43-
mapper: mapper,
44-
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
45-
codecs: serializer.NewCodecFactory(scheme),
46-
paramCodec: runtime.NewParameterCodec(scheme),
47-
resync: resync,
45+
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
46+
ip := &specificInformersMap{
47+
config: config,
48+
Scheme: scheme,
49+
mapper: mapper,
50+
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
51+
codecs: serializer.NewCodecFactory(scheme),
52+
paramCodec: runtime.NewParameterCodec(scheme),
53+
resync: resync,
54+
createListWatcher: createListWatcher,
4855
}
4956
return ip
5057
}
@@ -58,9 +65,9 @@ type MapEntry struct {
5865
Reader CacheReader
5966
}
6067

61-
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
62-
//It uses a standard parameter codec constructed based on the given generated Scheme.
63-
type InformersMap struct {
68+
// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
69+
// It uses a standard parameter codec constructed based on the given generated Scheme.
70+
type specificInformersMap struct {
6471
// Scheme maps runtime.Objects to GroupVersionKinds
6572
Scheme *runtime.Scheme
6673

@@ -90,10 +97,16 @@ type InformersMap struct {
9097

9198
// start is true if the informers have been started
9299
started bool
100+
101+
// createClient knows how to create a client and a list object,
102+
// and allows for abstracting over the particulars of structured vs
103+
// unstructured objects.
104+
createListWatcher createListWatcherFunc
93105
}
94106

95107
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
96-
func (ip *InformersMap) Start(stop <-chan struct{}) error {
108+
// It doesn't return start because it can't return an error, and it's not a runnable directly.
109+
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
97110
func() {
98111
ip.mu.Lock()
99112
defer ip.mu.Unlock()
@@ -110,21 +123,20 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error {
110123
ip.started = true
111124
}()
112125
<-stop
113-
return nil
114126
}
115127

116-
// WaitForCacheSync waits until all the caches have been synced
117-
func (ip *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
128+
// HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
129+
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
118130
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
119131
for _, informer := range ip.informersByGVK {
120132
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
121133
}
122-
return cache.WaitForCacheSync(stop, syncedFuncs...)
134+
return syncedFuncs
123135
}
124136

125-
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
137+
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
126138
// the Informer from the map.
127-
func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
139+
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
128140
// Return the informer if it is found
129141
i, ok := func() (*MapEntry, bool) {
130142
ip.mu.RLock()
@@ -154,7 +166,7 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
154166

155167
// Create a NewSharedIndexInformer and add it to the map.
156168
var lw *cache.ListWatch
157-
lw, err := ip.newListWatch(gvk)
169+
lw, err := ip.createListWatcher(gvk, ip)
158170
if err != nil {
159171
return nil, err
160172
}
@@ -191,22 +203,18 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M
191203
}
192204

193205
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
194-
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWatch, error) {
195-
// Construct a RESTClient for the groupVersionKind that we will use to
196-
// talk to the apiserver.
197-
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
198-
if err != nil {
199-
return nil, err
200-
}
201-
206+
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
202207
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
203208
// groupVersionKind to the Resource API we will use.
204209
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
205210
if err != nil {
206211
return nil, err
207212
}
208213

209-
// Get a listObject for listing that the ListWatch can DeepCopy
214+
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
215+
if err != nil {
216+
return nil, err
217+
}
210218
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
211219
listObj, err := ip.Scheme.New(listGVK)
212220
if err != nil {
@@ -228,3 +236,29 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWa
228236
},
229237
}, nil
230238
}
239+
240+
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
241+
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
242+
// groupVersionKind to the Resource API we will use.
243+
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
244+
if err != nil {
245+
return nil, err
246+
}
247+
dynamicClient, err := dynamic.NewForConfig(ip.config)
248+
if err != nil {
249+
return nil, err
250+
}
251+
252+
// Create a new ListWatch for the obj
253+
return &cache.ListWatch{
254+
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
255+
return dynamicClient.Resource(mapping.Resource).List(opts)
256+
},
257+
// Setup the watch function
258+
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
259+
// Watch needs to be set to true separately
260+
opts.Watch = true
261+
return dynamicClient.Resource(mapping.Resource).Watch(opts)
262+
},
263+
}, nil
264+
}

pkg/client/apiutil/apimachinery.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
6565
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
6666
// with the given GroupVersionKind.
6767
func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
68+
cfg := createRestConfig(gvk, baseConfig)
69+
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: codecs}
70+
return rest.RESTClientFor(cfg)
71+
}
72+
73+
//createRestConfig copies the base config and updates needed fields for a new rest config
74+
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
6875
gv := gvk.GroupVersion()
6976

7077
cfg := rest.CopyConfig(baseConfig)
@@ -74,9 +81,8 @@ func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, code
7481
} else {
7582
cfg.APIPath = "/apis"
7683
}
77-
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: codecs}
7884
if cfg.UserAgent == "" {
7985
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
8086
}
81-
return rest.RESTClientFor(cfg)
87+
return cfg
8288
}

0 commit comments

Comments
 (0)