Skip to content

Commit 44e39f9

Browse files
author
Shawn Hurley
committed
adding ability to use dynamic list for unstructured list watcher.
1 parent 4b128f3 commit 44e39f9

File tree

3 files changed

+42
-103
lines changed

3 files changed

+42
-103
lines changed

pkg/cache/internal/deleg_map.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*Ma
8686

8787
// newStructuredInformersMap creates a new InformersMap for structured objects.
8888
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
89-
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredClient)
89+
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
9090
}
9191

9292
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
9393
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
94-
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredClient)
94+
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
9595
}

pkg/cache/internal/informers_map.go

Lines changed: 40 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,35 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2726
"k8s.io/apimachinery/pkg/runtime"
2827
"k8s.io/apimachinery/pkg/runtime/schema"
2928
"k8s.io/apimachinery/pkg/runtime/serializer"
3029
"k8s.io/apimachinery/pkg/watch"
30+
"k8s.io/client-go/dynamic"
3131
"k8s.io/client-go/rest"
3232
"k8s.io/client-go/tools/cache"
3333

3434
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3535
)
3636

37-
// clientCreatorFunc knows how to create a client and the corresponding list object that it should
38-
// deserialize into for any given group-version-kind.
39-
type clientCreatorFunc func(gvk schema.GroupVersionKind,
40-
codecs serializer.CodecFactory,
41-
scheme *runtime.Scheme,
42-
baseConfig *rest.Config) (client rest.Interface, listObj runtime.Object, err error)
37+
// clientListWatcherFunc knows how to create a ListWatcher
38+
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
4339

4440
// newSpecificInformersMap returns a new specificInformersMap (like
4541
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
4642
func newSpecificInformersMap(config *rest.Config,
4743
scheme *runtime.Scheme,
4844
mapper meta.RESTMapper,
49-
resync time.Duration, createClient clientCreatorFunc) *specificInformersMap {
45+
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
5046
ip := &specificInformersMap{
51-
config: config,
52-
Scheme: scheme,
53-
mapper: mapper,
54-
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
55-
codecs: serializer.NewCodecFactory(scheme),
56-
paramCodec: runtime.NewParameterCodec(scheme),
57-
resync: resync,
58-
createClient: createClient,
47+
config: config,
48+
Scheme: scheme,
49+
mapper: mapper,
50+
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
51+
codecs: serializer.NewCodecFactory(scheme),
52+
paramCodec: runtime.NewParameterCodec(scheme),
53+
resync: resync,
54+
createListWatcher: createListWatcher,
5955
}
6056
return ip
6157
}
@@ -105,7 +101,7 @@ type specificInformersMap struct {
105101
// createClient knows how to create a client and a list object,
106102
// and allows for abstracting over the particulars of structured vs
107103
// unstructured objects.
108-
createClient clientCreatorFunc
104+
createListWatcher createListWatcherFunc
109105
}
110106

111107
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
@@ -170,7 +166,7 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
170166

171167
// Create a NewSharedIndexInformer and add it to the map.
172168
var lw *cache.ListWatch
173-
lw, err := ip.newListWatch(gvk)
169+
lw, err := ip.createListWatcher(gvk, ip)
174170
if err != nil {
175171
return nil, err
176172
}
@@ -207,17 +203,20 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
207203
}
208204

209205
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
210-
func (ip *specificInformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWatch, error) {
206+
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
211207
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
212208
// groupVersionKind to the Resource API we will use.
213209
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
214210
if err != nil {
215211
return nil, err
216212
}
217213

218-
// Construct a RESTClient for the groupVersionKind that we will use to
219-
// talk to the apiserver, and the list object that we'll use to describe our results.
220-
client, listObj, err := ip.createClient(gvk, ip.codecs, ip.Scheme, ip.config)
214+
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
215+
if err != nil {
216+
return nil, err
217+
}
218+
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
219+
listObj, err := ip.Scheme.New(listGVK)
221220
if err != nil {
222221
return nil, err
223222
}
@@ -238,37 +237,28 @@ func (ip *specificInformersMap) newListWatch(gvk schema.GroupVersionKind) (*cach
238237
}, nil
239238
}
240239

241-
// createStructuredClient is a ClientCreatorFunc for use with structured
242-
// objects (i.e. not Unstructured/UnstructuredList).
243-
func createStructuredClient(gvk schema.GroupVersionKind,
244-
codecs serializer.CodecFactory,
245-
scheme *runtime.Scheme,
246-
baseConfig *rest.Config) (rest.Interface, runtime.Object, error) {
247-
248-
client, err := apiutil.RESTClientForGVK(gvk, baseConfig, codecs)
240+
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
241+
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
242+
// groupVersionKind to the Resource API we will use.
243+
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
249244
if err != nil {
250-
return nil, nil, err
245+
return nil, err
251246
}
252-
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
253-
listObj, err := scheme.New(listGVK)
247+
dynamicClient, err := dynamic.NewForConfig(ip.config)
254248
if err != nil {
255-
return nil, nil, err
256-
}
257-
258-
return client, listObj, nil
259-
}
260-
261-
// createUnstructuredClient is a ClientCreatorFunc for use with Unstructured and UnstructuredList.
262-
func createUnstructuredClient(gvk schema.GroupVersionKind,
263-
_ serializer.CodecFactory,
264-
_ *runtime.Scheme,
265-
baseConfig *rest.Config) (rest.Interface, runtime.Object, error) {
266-
267-
listObj := &unstructured.UnstructuredList{}
268-
client, err := apiutil.RESTUnstructuredClientForGVK(gvk, baseConfig)
269-
if err != nil {
270-
return nil, nil, err
249+
return nil, err
271250
}
272251

273-
return client, listObj, nil
252+
// Create a new ListWatch for the obj
253+
return &cache.ListWatch{
254+
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
255+
return dynamicClient.Resource(mapping.Resource).List(opts)
256+
},
257+
// Setup the watch function
258+
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
259+
// Watch needs to be set to true separately
260+
opts.Watch = true
261+
return dynamicClient.Resource(mapping.Resource).Watch(opts)
262+
},
263+
}, nil
274264
}

pkg/client/apiutil/apimachinery.go

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,13 @@ limitations under the License.
1717
package apiutil
1818

1919
import (
20-
"encoding/json"
2120
"fmt"
22-
"io"
23-
"strings"
24-
25-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2621

2722
"k8s.io/apimachinery/pkg/api/meta"
28-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2923
"k8s.io/apimachinery/pkg/runtime"
3024
"k8s.io/apimachinery/pkg/runtime/schema"
3125
"k8s.io/apimachinery/pkg/runtime/serializer"
3226
"k8s.io/client-go/discovery"
33-
"k8s.io/client-go/kubernetes/scheme"
3427
"k8s.io/client-go/rest"
3528
"k8s.io/client-go/restmapper"
3629
)
@@ -77,22 +70,6 @@ func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, code
7770
return rest.RESTClientFor(cfg)
7871
}
7972

80-
// RESTUnstructuredClientForGVK constructs a new rest.Interface for accessing unstructured resources.
81-
func RESTUnstructuredClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config) (rest.Interface, error) {
82-
cfg := createRestConfig(gvk, baseConfig)
83-
var jsonInfo runtime.SerializerInfo
84-
for _, info := range scheme.Codecs.SupportedMediaTypes() {
85-
if info.MediaType == runtime.ContentTypeJSON {
86-
jsonInfo = info
87-
break
88-
}
89-
}
90-
jsonInfo.Serializer = dynamicCodec{}
91-
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(jsonInfo)
92-
93-
return rest.RESTClientFor(cfg)
94-
}
95-
9673
//createRestConfig copies the base config and updates needed fields for a new rest config
9774
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
9875
gv := gvk.GroupVersion()
@@ -108,32 +85,4 @@ func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *res
10885
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
10986
}
11087
return cfg
111-
112-
}
113-
114-
//Copied from deprecated-dynamic/bad_debt.go
115-
// dynamicCodec is a codec that wraps the standard unstructured codec
116-
// with special handling for Status objects.
117-
// Deprecated only used by test code and its wrong
118-
type dynamicCodec struct{}
119-
120-
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
121-
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
122-
if err != nil {
123-
return nil, nil, err
124-
}
125-
126-
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
127-
obj = &metav1.Status{}
128-
err := json.Unmarshal(data, obj)
129-
if err != nil {
130-
return nil, nil, err
131-
}
132-
}
133-
134-
return obj, gvk, nil
135-
}
136-
137-
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
138-
return unstructured.UnstructuredJSONScheme.Encode(obj, w)
13988
}

0 commit comments

Comments
 (0)