Skip to content

Commit 5a35d58

Browse files
committed
WIP: Use proto if we have proto types
1 parent 4e7f0c9 commit 5a35d58

File tree

4 files changed

+144
-32
lines changed

4 files changed

+144
-32
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
266266
return nil, err
267267
}
268268

269-
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
269+
client, err := apiutil.RESTClientForGVK(ip.Scheme, gvk, false, ip.config, ip.codecs)
270270
if err != nil {
271271
return nil, err
272272
}

pkg/client/apiutil/apimachinery.go

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,41 +21,18 @@ package apiutil
2121

2222
import (
2323
"fmt"
24-
"sync"
24+
"reflect"
2525

2626
"k8s.io/apimachinery/pkg/api/meta"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/runtime"
2929
"k8s.io/apimachinery/pkg/runtime/schema"
3030
"k8s.io/apimachinery/pkg/runtime/serializer"
3131
"k8s.io/client-go/discovery"
32-
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3332
"k8s.io/client-go/rest"
3433
"k8s.io/client-go/restmapper"
3534
)
3635

37-
var (
38-
protobufScheme = runtime.NewScheme()
39-
protobufSchemeLock sync.RWMutex
40-
)
41-
42-
func init() {
43-
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
44-
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
45-
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
46-
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
47-
panic(err)
48-
}
49-
}
50-
51-
// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
52-
// be additional types that do support protobuf.
53-
func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
54-
protobufSchemeLock.Lock()
55-
defer protobufSchemeLock.Unlock()
56-
return addToScheme(protobufScheme)
57-
}
58-
5936
// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
6037
// information fetched by a new client with the given config.
6138
func NewDiscoveryRESTMapper(c *rest.Config) (meta.RESTMapper, error) {
@@ -117,8 +94,8 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
11794
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
11895
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
11996
// baseConfig, if set, otherwise a default serializer will be set.
120-
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
121-
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
97+
func RESTClientForGVK(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
98+
return rest.RESTClientFor(createRestConfig(scheme, gvk, isUnstructured, baseConfig, codecs))
12299
}
123100

124101
// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
@@ -135,7 +112,7 @@ func (f serializerWithDecodedGVK) DecoderToVersion(serializer runtime.Decoder, _
135112
}
136113

137114
// createRestConfig copies the base config and updates needed fields for a new rest config.
138-
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
115+
func createRestConfig(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
139116
gv := gvk.GroupVersion()
140117

141118
cfg := rest.CopyConfig(baseConfig)
@@ -150,11 +127,9 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
150127
}
151128
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
152129
if cfg.ContentType == "" && !isUnstructured {
153-
protobufSchemeLock.RLock()
154-
if protobufScheme.Recognizes(gvk) {
130+
if haveProtoDefinition(scheme, gvk) {
155131
cfg.ContentType = runtime.ContentTypeProtobuf
156132
}
157-
protobufSchemeLock.RUnlock()
158133
}
159134

160135
if cfg.NegotiatedSerializer == nil {
@@ -169,3 +144,23 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
169144

170145
return cfg
171146
}
147+
148+
// protoMessage is implemented by protobuf messages (of all libraries)
149+
type protoMessage interface {
150+
ProtoMessage()
151+
}
152+
153+
var protoMessageType = reflect.TypeOf(new(protoMessage)).Elem()
154+
155+
// haveProtoDefinition returns true if the go type for the specified gvk support protobuf encoding.
156+
func haveProtoDefinition(scheme *runtime.Scheme, gvk schema.GroupVersionKind) bool {
157+
gvkType, found := scheme.AllKnownTypes()[gvk]
158+
if !found {
159+
return false
160+
}
161+
// We normally get the raw struct e.g. v1.Pod, not &v1.Pod
162+
if gvkType.Kind() == reflect.Struct {
163+
gvkType = reflect.PtrTo(gvkType)
164+
}
165+
return gvkType.Implements(protoMessageType)
166+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package apiutil contains utilities for working with raw Kubernetes
18+
// API machinery, such as creating RESTMappers and raw REST clients,
19+
// and extracting the GVK of an object.
20+
package apiutil
21+
22+
import (
23+
"fmt"
24+
"testing"
25+
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
30+
"sigs.k8s.io/controller-runtime/pkg/scheme"
31+
)
32+
33+
var exampleSchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
34+
35+
type ExampleCRD struct {
36+
metav1.TypeMeta `json:",inline"`
37+
metav1.ObjectMeta `json:"metadata,omitempty"`
38+
}
39+
40+
func (e *ExampleCRD) DeepCopyObject() runtime.Object {
41+
panic("not implemented")
42+
}
43+
44+
func TestHaveProtoDefinition(t *testing.T) {
45+
exampleCRDScheme := runtime.NewScheme()
46+
47+
builder := &scheme.Builder{GroupVersion: exampleSchemeGroupVersion}
48+
builder.Register(&ExampleCRD{})
49+
if err := builder.AddToScheme(exampleCRDScheme); err != nil {
50+
t.Fatalf("AddToScheme failed: %v", err)
51+
}
52+
53+
schemes := map[string]*runtime.Scheme{
54+
"kubernetes": kubernetesscheme.Scheme,
55+
"empty": runtime.NewScheme(),
56+
"example.com": exampleCRDScheme,
57+
}
58+
grid := []struct {
59+
scheme string
60+
gvk schema.GroupVersionKind
61+
wantType string
62+
wantHaveProtoDefinition bool
63+
}{
64+
{
65+
scheme: "kubernetes",
66+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
67+
wantType: "v1.Pod",
68+
wantHaveProtoDefinition: true,
69+
},
70+
{
71+
scheme: "kubernetes",
72+
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Pod"},
73+
wantType: "",
74+
wantHaveProtoDefinition: false,
75+
},
76+
{
77+
scheme: "empty",
78+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
79+
wantType: "",
80+
wantHaveProtoDefinition: false,
81+
},
82+
{
83+
scheme: "example.com",
84+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
85+
wantType: "",
86+
wantHaveProtoDefinition: false,
87+
},
88+
{
89+
scheme: "example.com",
90+
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ExampleCRD"},
91+
wantType: "apiutil.ExampleCRD",
92+
wantHaveProtoDefinition: false,
93+
},
94+
}
95+
96+
for _, g := range grid {
97+
t.Run(fmt.Sprintf("%#v", g), func(t *testing.T) {
98+
scheme := schemes[g.scheme]
99+
if scheme == nil {
100+
t.Errorf("scheme %q not found", g.scheme)
101+
}
102+
103+
gotType := ""
104+
if t := scheme.AllKnownTypes()[g.gvk]; t != nil {
105+
gotType = t.String()
106+
}
107+
if gotType != g.wantType {
108+
t.Errorf("unexpected type got %v, want %v", gotType, g.wantType)
109+
110+
}
111+
gotHaveProtoDefinition := haveProtoDefinition(scheme, g.gvk)
112+
if gotHaveProtoDefinition != g.wantHaveProtoDefinition {
113+
t.Errorf("haveProtoDefintion(%#v, %#v) got %v, want %v", g.scheme, g.gvk, gotHaveProtoDefinition, g.wantHaveProtoDefinition)
114+
}
115+
})
116+
}
117+
}

pkg/client/client_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstruc
5959
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
6060
}
6161

62-
client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
62+
client, err := apiutil.RESTClientForGVK(c.scheme, gvk, isUnstructured, c.config, c.codecs)
6363
if err != nil {
6464
return nil, err
6565
}

0 commit comments

Comments
 (0)