Skip to content

Commit 602bb3c

Browse files
committed
Use proto only if we have proto types
We should not try to use proto if the scheme does not have the protobuf representation in the go type; we won't be able to deserialize it.
1 parent 4e7f0c9 commit 602bb3c

File tree

4 files changed

+272
-19
lines changed

4 files changed

+272
-19
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
266266
return nil, err
267267
}
268268

269-
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
269+
client, err := apiutil.RESTClientForGVK(ip.Scheme, gvk, false, ip.config, ip.codecs)
270270
if err != nil {
271271
return nil, err
272272
}

pkg/client/apiutil/apimachinery.go

Lines changed: 132 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package apiutil
2121

2222
import (
2323
"fmt"
24+
"reflect"
2425
"sync"
2526

2627
"k8s.io/apimachinery/pkg/api/meta"
@@ -29,7 +30,6 @@ import (
2930
"k8s.io/apimachinery/pkg/runtime/schema"
3031
"k8s.io/apimachinery/pkg/runtime/serializer"
3132
"k8s.io/client-go/discovery"
32-
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3333
"k8s.io/client-go/rest"
3434
"k8s.io/client-go/restmapper"
3535
)
@@ -39,17 +39,8 @@ var (
3939
protobufSchemeLock sync.RWMutex
4040
)
4141

42-
func init() {
43-
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
44-
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
45-
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
46-
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
47-
panic(err)
48-
}
49-
}
50-
5142
// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
52-
// be additional types that do support protobuf.
43+
// be additional types where we do want to support protobuf.
5344
func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
5445
protobufSchemeLock.Lock()
5546
defer protobufSchemeLock.Unlock()
@@ -117,8 +108,8 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
117108
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
118109
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
119110
// baseConfig, if set, otherwise a default serializer will be set.
120-
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
121-
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
111+
func RESTClientForGVK(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
112+
return rest.RESTClientFor(createRestConfig(scheme, gvk, isUnstructured, baseConfig, codecs))
122113
}
123114

124115
// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
@@ -135,7 +126,7 @@ func (f serializerWithDecodedGVK) DecoderToVersion(serializer runtime.Decoder, _
135126
}
136127

137128
// createRestConfig copies the base config and updates needed fields for a new rest config.
138-
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
129+
func createRestConfig(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
139130
gv := gvk.GroupVersion()
140131

141132
cfg := rest.CopyConfig(baseConfig)
@@ -150,11 +141,9 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
150141
}
151142
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
152143
if cfg.ContentType == "" && !isUnstructured {
153-
protobufSchemeLock.RLock()
154-
if protobufScheme.Recognizes(gvk) {
144+
if canUseProtobuf(scheme, gvk) {
155145
cfg.ContentType = runtime.ContentTypeProtobuf
156146
}
157-
protobufSchemeLock.RUnlock()
158147
}
159148

160149
if cfg.NegotiatedSerializer == nil {
@@ -169,3 +158,129 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
169158

170159
return cfg
171160
}
161+
162+
// canUseProtobuf returns true if we should use protobuf encoding.
163+
// We need two things: (1) the apiserver must support protobuf for the type,
164+
// and (2) we must have a proto-compatible receiving go type.
165+
// Because it's hard to know in general if apiserver supports proto for a given GVK,
166+
// we maintain a list of built-in apigroups which do support proto;
167+
// additional schemas can be added as proto-safe using AddToProtobufScheme.
168+
// We check if we have a proto-compatible type by interface casting.
169+
func canUseProtobuf(scheme *runtime.Scheme, gvk schema.GroupVersionKind) bool {
170+
// Check that the client supports proto for this type
171+
gvkType, found := scheme.AllKnownTypes()[gvk]
172+
if !found {
173+
// We aren't going to be able to deserialize proto without type information.
174+
return false
175+
}
176+
if !implementsProto(gvkType) {
177+
// We don't have proto information, can't parse proto
178+
return false
179+
}
180+
181+
// Check that the apiserver also supports proto for this type
182+
serverSupportsProto := false
183+
184+
// Check for built-in types well-known to support proto
185+
serverSupportsProto = isWellKnownKindThatSupportsProto(gvk)
186+
187+
// Check for additional types explicitly marked as supporting proto
188+
if !serverSupportsProto {
189+
protobufSchemeLock.RLock()
190+
serverSupportsProto = protobufScheme.Recognizes(gvk)
191+
protobufSchemeLock.RUnlock()
192+
}
193+
194+
return serverSupportsProto
195+
}
196+
197+
// isWellKnownKindThatSupportsProto returns true if the gvk is a well-known Kind that supports proto encoding.
198+
func isWellKnownKindThatSupportsProto(gvk schema.GroupVersionKind) bool {
199+
// corev1
200+
if gvk.Group == "" && gvk.Version == "v1" {
201+
return true
202+
}
203+
204+
// extensions v1beta1
205+
if gvk.Group == "extensions" && gvk.Version == "v1beta1" {
206+
return true
207+
}
208+
209+
// Generated with `kubectl api-resources -oname | grep "\." | sort | cut -f2- -d. | sort | uniq | awk '{print "case \"" $0 "\": return true" }'` (before adding any CRDs)
210+
switch gvk.Group {
211+
case "admissionregistration.k8s.io":
212+
return true
213+
case "apiextensions.k8s.io":
214+
return true
215+
case "apiregistration.k8s.io":
216+
return true
217+
case "apps":
218+
return true
219+
case "authentication.k8s.io":
220+
return true
221+
case "authorization.k8s.io":
222+
return true
223+
case "autoscaling":
224+
return true
225+
case "batch":
226+
return true
227+
case "certificates.k8s.io":
228+
return true
229+
case "coordination.k8s.io":
230+
return true
231+
case "discovery.k8s.io":
232+
return true
233+
case "events.k8s.io":
234+
return true
235+
case "flowcontrol.apiserver.k8s.io":
236+
return true
237+
case "networking.k8s.io":
238+
return true
239+
case "node.k8s.io":
240+
return true
241+
case "policy":
242+
return true
243+
case "rbac.authorization.k8s.io":
244+
return true
245+
case "scheduling.k8s.io":
246+
return true
247+
case "storage.k8s.io":
248+
return true
249+
}
250+
return false
251+
}
252+
253+
var (
254+
memoizeImplementsProto = make(map[reflect.Type]bool)
255+
memoizeImplementsProtoLock sync.RWMutex
256+
)
257+
258+
// protoMessage is implemented by protobuf messages (of all libraries).
259+
type protoMessage interface {
260+
ProtoMessage()
261+
}
262+
263+
var protoMessageType = reflect.TypeOf(new(protoMessage)).Elem()
264+
265+
// implementsProto returns true if the local go type supports protobuf deserialization.
266+
func implementsProto(t reflect.Type) bool {
267+
memoizeImplementsProtoLock.RLock()
268+
result, found := memoizeImplementsProto[t]
269+
memoizeImplementsProtoLock.RUnlock()
270+
271+
if found {
272+
return result
273+
}
274+
275+
// We normally get the raw struct e.g. v1.Pod, not &v1.Pod
276+
if t.Kind() == reflect.Struct {
277+
return implementsProto(reflect.PtrTo(t))
278+
}
279+
280+
result = t.Implements(protoMessageType)
281+
memoizeImplementsProtoLock.Lock()
282+
memoizeImplementsProto[t] = result
283+
memoizeImplementsProtoLock.Unlock()
284+
285+
return result
286+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package apiutil contains utilities for working with raw Kubernetes
18+
// API machinery, such as creating RESTMappers and raw REST clients,
19+
// and extracting the GVK of an object.
20+
package apiutil
21+
22+
import (
23+
"fmt"
24+
"testing"
25+
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
30+
"sigs.k8s.io/controller-runtime/pkg/scheme"
31+
)
32+
33+
var exampleSchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
34+
35+
type ExampleCRD struct {
36+
metav1.TypeMeta `json:",inline"`
37+
metav1.ObjectMeta `json:"metadata,omitempty"`
38+
}
39+
40+
func (e *ExampleCRD) DeepCopyObject() runtime.Object {
41+
panic("not implemented")
42+
}
43+
44+
func TestCanUseProtobuf(t *testing.T) {
45+
exampleCRDScheme := runtime.NewScheme()
46+
47+
builder := &scheme.Builder{GroupVersion: exampleSchemeGroupVersion}
48+
builder.Register(&ExampleCRD{})
49+
if err := builder.AddToScheme(exampleCRDScheme); err != nil {
50+
t.Fatalf("AddToScheme failed: %v", err)
51+
}
52+
53+
schemes := map[string]*runtime.Scheme{
54+
"kubernetes": kubernetesscheme.Scheme,
55+
"empty": runtime.NewScheme(),
56+
"example.com": exampleCRDScheme,
57+
}
58+
grid := []struct {
59+
scheme string
60+
gvk schema.GroupVersionKind
61+
wantType string
62+
wantCanUseProtobuf bool
63+
}{
64+
{
65+
scheme: "kubernetes",
66+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
67+
wantType: "v1.Pod",
68+
wantCanUseProtobuf: true,
69+
},
70+
{
71+
scheme: "kubernetes",
72+
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Pod"},
73+
wantType: "",
74+
wantCanUseProtobuf: false,
75+
},
76+
{
77+
scheme: "empty",
78+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
79+
wantType: "",
80+
wantCanUseProtobuf: false,
81+
},
82+
{
83+
scheme: "example.com",
84+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
85+
wantType: "",
86+
wantCanUseProtobuf: false,
87+
},
88+
{
89+
scheme: "example.com",
90+
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ExampleCRD"},
91+
wantType: "apiutil.ExampleCRD",
92+
wantCanUseProtobuf: false,
93+
},
94+
}
95+
96+
for _, g := range grid {
97+
t.Run(fmt.Sprintf("%#v", g), func(t *testing.T) {
98+
scheme := schemes[g.scheme]
99+
if scheme == nil {
100+
t.Errorf("scheme %q not found", g.scheme)
101+
}
102+
103+
gotType := ""
104+
if t := scheme.AllKnownTypes()[g.gvk]; t != nil {
105+
gotType = t.String()
106+
}
107+
if gotType != g.wantType {
108+
t.Errorf("unexpected type got %v, want %v", gotType, g.wantType)
109+
110+
}
111+
gotCanUseProtobuf := canUseProtobuf(scheme, g.gvk)
112+
if gotCanUseProtobuf != g.wantCanUseProtobuf {
113+
t.Errorf("canUseProtobuf(%#v, %#v) got %v, want %v", g.scheme, g.gvk, gotCanUseProtobuf, g.wantCanUseProtobuf)
114+
}
115+
})
116+
}
117+
}
118+
119+
func TestCanUseProtobufForAllBuiltins(t *testing.T) {
120+
emptyScheme := runtime.NewScheme()
121+
122+
allKnownTypes := kubernetesscheme.Scheme.AllKnownTypes()
123+
for gvk := range allKnownTypes {
124+
// Ignore internal bookkeeping types
125+
if gvk.Version == "__internal" || gvk.Group == "internal.apiserver.k8s.io" {
126+
continue
127+
}
128+
129+
if !canUseProtobuf(kubernetesscheme.Scheme, gvk) {
130+
t.Errorf("canUseProtobuf for built-in GVK %#v returned false, expected built-ins to support proto", gvk)
131+
}
132+
133+
// If we don't have the type in the scheme, double check we don't try to use proto.
134+
if canUseProtobuf(emptyScheme, gvk) {
135+
t.Errorf("canUseProtobuf for built-in GVK %#v returned true with empty scheme, but empty scheme cannot support proto", gvk)
136+
}
137+
}
138+
}

pkg/client/client_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstruc
5959
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
6060
}
6161

62-
client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
62+
client, err := apiutil.RESTClientForGVK(c.scheme, gvk, isUnstructured, c.config, c.codecs)
6363
if err != nil {
6464
return nil, err
6565
}

0 commit comments

Comments
 (0)