Skip to content

Commit 9f1aac0

Browse files
committed
Use proto only if we have proto types
We should not try to use proto if the scheme does not have the protobuf representation in the go type; we won't be able to deserialize it.
1 parent 6c7c1d7 commit 9f1aac0

File tree

4 files changed

+271
-19
lines changed

4 files changed

+271
-19
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
261261
return nil, err
262262
}
263263

264-
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
264+
client, err := apiutil.RESTClientForGVK(ip.Scheme, gvk, false, ip.config, ip.codecs)
265265
if err != nil {
266266
return nil, err
267267
}

pkg/client/apiutil/apimachinery.go

Lines changed: 131 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime/schema"
3131
"k8s.io/apimachinery/pkg/runtime/serializer"
3232
"k8s.io/client-go/discovery"
33-
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3433
"k8s.io/client-go/rest"
3534
"k8s.io/client-go/restmapper"
3635
)
@@ -40,17 +39,8 @@ var (
4039
protobufSchemeLock sync.RWMutex
4140
)
4241

43-
func init() {
44-
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
45-
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
46-
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
47-
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
48-
panic(err)
49-
}
50-
}
51-
5242
// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
53-
// be additional types that do support protobuf.
43+
// be additional types where we do want to support protobuf.
5444
func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
5545
protobufSchemeLock.Lock()
5646
defer protobufSchemeLock.Unlock()
@@ -118,8 +108,8 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
118108
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
119109
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
120110
// baseConfig, if set, otherwise a default serializer will be set.
121-
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
122-
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
111+
func RESTClientForGVK(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
112+
return rest.RESTClientFor(createRestConfig(scheme, gvk, isUnstructured, baseConfig, codecs))
123113
}
124114

125115
// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
@@ -136,7 +126,7 @@ func (f serializerWithDecodedGVK) DecoderToVersion(serializer runtime.Decoder, _
136126
}
137127

138128
// createRestConfig copies the base config and updates needed fields for a new rest config.
139-
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
129+
func createRestConfig(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
140130
gv := gvk.GroupVersion()
141131

142132
cfg := rest.CopyConfig(baseConfig)
@@ -151,11 +141,9 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
151141
}
152142
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
153143
if cfg.ContentType == "" && !isUnstructured {
154-
protobufSchemeLock.RLock()
155-
if protobufScheme.Recognizes(gvk) {
144+
if canUseProtobuf(scheme, gvk) {
156145
cfg.ContentType = runtime.ContentTypeProtobuf
157146
}
158-
protobufSchemeLock.RUnlock()
159147
}
160148

161149
if isUnstructured {
@@ -194,3 +182,129 @@ func zero(x interface{}) {
194182
res := reflect.ValueOf(x).Elem()
195183
res.Set(reflect.Zero(res.Type()))
196184
}
185+
186+
// canUseProtobuf returns true if we should use protobuf encoding.
187+
// We need two things: (1) the apiserver must support protobuf for the type,
188+
// and (2) we must have a proto-compatible receiving go type.
189+
// Because it's hard to know in general if apiserver supports proto for a given GVK,
190+
// we maintain a list of built-in apigroups which do support proto;
191+
// additional schemas can be added as proto-safe using AddToProtobufScheme.
192+
// We check if we have a proto-compatible type by interface casting.
193+
func canUseProtobuf(scheme *runtime.Scheme, gvk schema.GroupVersionKind) bool {
194+
// Check that the client supports proto for this type
195+
gvkType, found := scheme.AllKnownTypes()[gvk]
196+
if !found {
197+
// We aren't going to be able to deserialize proto without type information.
198+
return false
199+
}
200+
if !implementsProto(gvkType) {
201+
// We don't have proto information, can't parse proto
202+
return false
203+
}
204+
205+
// Check that the apiserver also supports proto for this type
206+
serverSupportsProto := false
207+
208+
// Check for built-in types well-known to support proto
209+
serverSupportsProto = isWellKnownKindThatSupportsProto(gvk)
210+
211+
// Check for additional types explicitly marked as supporting proto
212+
if !serverSupportsProto {
213+
protobufSchemeLock.RLock()
214+
serverSupportsProto = protobufScheme.Recognizes(gvk)
215+
protobufSchemeLock.RUnlock()
216+
}
217+
218+
return serverSupportsProto
219+
}
220+
221+
// isWellKnownKindThatSupportsProto returns true if the gvk is a well-known Kind that supports proto encoding.
222+
func isWellKnownKindThatSupportsProto(gvk schema.GroupVersionKind) bool {
223+
// corev1
224+
if gvk.Group == "" && gvk.Version == "v1" {
225+
return true
226+
}
227+
228+
// extensions v1beta1
229+
if gvk.Group == "extensions" && gvk.Version == "v1beta1" {
230+
return true
231+
}
232+
233+
// Generated with `kubectl api-resources -oname | grep "\." | sort | cut -f2- -d. | sort | uniq | awk '{print "case \"" $0 "\": return true" }'` (before adding any CRDs)
234+
switch gvk.Group {
235+
case "admissionregistration.k8s.io":
236+
return true
237+
case "apiextensions.k8s.io":
238+
return true
239+
case "apiregistration.k8s.io":
240+
return true
241+
case "apps":
242+
return true
243+
case "authentication.k8s.io":
244+
return true
245+
case "authorization.k8s.io":
246+
return true
247+
case "autoscaling":
248+
return true
249+
case "batch":
250+
return true
251+
case "certificates.k8s.io":
252+
return true
253+
case "coordination.k8s.io":
254+
return true
255+
case "discovery.k8s.io":
256+
return true
257+
case "events.k8s.io":
258+
return true
259+
case "flowcontrol.apiserver.k8s.io":
260+
return true
261+
case "networking.k8s.io":
262+
return true
263+
case "node.k8s.io":
264+
return true
265+
case "policy":
266+
return true
267+
case "rbac.authorization.k8s.io":
268+
return true
269+
case "scheduling.k8s.io":
270+
return true
271+
case "storage.k8s.io":
272+
return true
273+
}
274+
return false
275+
}
276+
277+
var (
278+
memoizeImplementsProto = make(map[reflect.Type]bool)
279+
memoizeImplementsProtoLock sync.RWMutex
280+
)
281+
282+
// protoMessage is implemented by protobuf messages (of all libraries).
283+
type protoMessage interface {
284+
ProtoMessage()
285+
}
286+
287+
var protoMessageType = reflect.TypeOf(new(protoMessage)).Elem()
288+
289+
// implementsProto returns true if the local go type supports protobuf deserialization.
290+
func implementsProto(t reflect.Type) bool {
291+
memoizeImplementsProtoLock.RLock()
292+
result, found := memoizeImplementsProto[t]
293+
memoizeImplementsProtoLock.RUnlock()
294+
295+
if found {
296+
return result
297+
}
298+
299+
// We normally get the raw struct e.g. v1.Pod, not &v1.Pod
300+
if t.Kind() == reflect.Struct {
301+
return implementsProto(reflect.PtrTo(t))
302+
}
303+
304+
result = t.Implements(protoMessageType)
305+
memoizeImplementsProtoLock.Lock()
306+
memoizeImplementsProto[t] = result
307+
memoizeImplementsProtoLock.Unlock()
308+
309+
return result
310+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package apiutil contains utilities for working with raw Kubernetes
18+
// API machinery, such as creating RESTMappers and raw REST clients,
19+
// and extracting the GVK of an object.
20+
package apiutil
21+
22+
import (
23+
"fmt"
24+
"testing"
25+
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
30+
"sigs.k8s.io/controller-runtime/pkg/scheme"
31+
)
32+
33+
var exampleSchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
34+
35+
type ExampleCRD struct {
36+
metav1.TypeMeta `json:",inline"`
37+
metav1.ObjectMeta `json:"metadata,omitempty"`
38+
}
39+
40+
func (e *ExampleCRD) DeepCopyObject() runtime.Object {
41+
panic("not implemented")
42+
}
43+
44+
func TestCanUseProtobuf(t *testing.T) {
45+
exampleCRDScheme := runtime.NewScheme()
46+
47+
builder := &scheme.Builder{GroupVersion: exampleSchemeGroupVersion}
48+
builder.Register(&ExampleCRD{})
49+
if err := builder.AddToScheme(exampleCRDScheme); err != nil {
50+
t.Fatalf("AddToScheme failed: %v", err)
51+
}
52+
53+
schemes := map[string]*runtime.Scheme{
54+
"kubernetes": kubernetesscheme.Scheme,
55+
"empty": runtime.NewScheme(),
56+
"example.com": exampleCRDScheme,
57+
}
58+
grid := []struct {
59+
scheme string
60+
gvk schema.GroupVersionKind
61+
wantType string
62+
wantCanUseProtobuf bool
63+
}{
64+
{
65+
scheme: "kubernetes",
66+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
67+
wantType: "v1.Pod",
68+
wantCanUseProtobuf: true,
69+
},
70+
{
71+
scheme: "kubernetes",
72+
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Pod"},
73+
wantType: "",
74+
wantCanUseProtobuf: false,
75+
},
76+
{
77+
scheme: "empty",
78+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
79+
wantType: "",
80+
wantCanUseProtobuf: false,
81+
},
82+
{
83+
scheme: "example.com",
84+
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
85+
wantType: "",
86+
wantCanUseProtobuf: false,
87+
},
88+
{
89+
scheme: "example.com",
90+
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ExampleCRD"},
91+
wantType: "apiutil.ExampleCRD",
92+
wantCanUseProtobuf: false,
93+
},
94+
}
95+
96+
for _, g := range grid {
97+
t.Run(fmt.Sprintf("%#v", g), func(t *testing.T) {
98+
scheme := schemes[g.scheme]
99+
if scheme == nil {
100+
t.Errorf("scheme %q not found", g.scheme)
101+
}
102+
103+
gotType := ""
104+
if t := scheme.AllKnownTypes()[g.gvk]; t != nil {
105+
gotType = t.String()
106+
}
107+
if gotType != g.wantType {
108+
t.Errorf("unexpected type got %v, want %v", gotType, g.wantType)
109+
}
110+
gotCanUseProtobuf := canUseProtobuf(scheme, g.gvk)
111+
if gotCanUseProtobuf != g.wantCanUseProtobuf {
112+
t.Errorf("canUseProtobuf(%#v, %#v) got %v, want %v", g.scheme, g.gvk, gotCanUseProtobuf, g.wantCanUseProtobuf)
113+
}
114+
})
115+
}
116+
}
117+
118+
func TestCanUseProtobufForAllBuiltins(t *testing.T) {
119+
emptyScheme := runtime.NewScheme()
120+
121+
allKnownTypes := kubernetesscheme.Scheme.AllKnownTypes()
122+
for gvk := range allKnownTypes {
123+
// Ignore internal bookkeeping types
124+
if gvk.Version == "__internal" || gvk.Group == "internal.apiserver.k8s.io" {
125+
continue
126+
}
127+
128+
if !canUseProtobuf(kubernetesscheme.Scheme, gvk) {
129+
// If this fails on a k8s api library upgrade, we likely need to update isWellKnownKindThatSupportsProto for a new built-in group.
130+
t.Errorf("canUseProtobuf for built-in GVK %#v returned false, expected built-ins to support proto", gvk)
131+
}
132+
133+
// If we don't have the type in the scheme, double check we don't try to use proto.
134+
if canUseProtobuf(emptyScheme, gvk) {
135+
t.Errorf("canUseProtobuf for built-in GVK %#v returned true with empty scheme, but empty scheme cannot support proto", gvk)
136+
}
137+
}
138+
}

pkg/client/client_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstruc
5959
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
6060
}
6161

62-
client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
62+
client, err := apiutil.RESTClientForGVK(c.scheme, gvk, isUnstructured, c.config, c.codecs)
6363
if err != nil {
6464
return nil, err
6565
}

0 commit comments

Comments
 (0)