Skip to content

Commit fed41f5

Browse files
smarterclaytonk8s-publishing-bot
authored andcommitted
Always negotiate a decoder using ClientNegotiator
This commit performs two refactors and fixes a bug. Refactor 1 changes the signature of Request to take a RESTClient, which removes the extra copy of everything on RESTClient from Request. A pair of optional constructors are added for testing. The major functional change is that Request no longer has the shim HTTPClient interface and so some test cases change slightly because we are now going through http.Client code paths instead of direct to our test stubs. Refactor 2 changes the signature of RESTClient to take a ClientContentConfig instead of ContentConfig - the primary difference being that ClientContentConfig uses ClientNegotiator instead of NegotiatedSerializer and the old Serializers type. We also collapse some redundancies (like the rate limiter can be created outside the constructor). The bug fix is to negotiate the streaming content type on a Watch() like we do for requests. We stop caching the decoder and simply resolve it on the request. We also clean up the dynamic client and remove the extra WatchSpecificVersions() method by providing a properly wrapped dynamic client. Kubernetes-commit: 3b780c64b89606f4e6b21f48fb9c305d5998b9e5
1 parent 9155fb5 commit fed41f5

File tree

8 files changed

+666
-470
lines changed

8 files changed

+666
-470
lines changed

metadata/fake/simple.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ func (c *metadataResourceClient) List(opts metav1.ListOptions) (*metav1.PartialO
317317
if !ok {
318318
return nil, fmt.Errorf("incoming object is incorrect type %T", obj)
319319
}
320-
fmt.Printf("DEBUG: %#v\n", inputList)
321320

322321
list := &metav1.PartialObjectMetadataList{
323322
ListMeta: inputList.ListMeta,

rest/client.go

Lines changed: 40 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package rest
1818

1919
import (
20-
"fmt"
21-
"mime"
2220
"net/http"
2321
"net/url"
2422
"os"
@@ -51,6 +49,28 @@ type Interface interface {
5149
APIVersion() schema.GroupVersion
5250
}
5351

52+
// ClientContentConfig controls how RESTClient communicates with the server.
53+
//
54+
// TODO: ContentConfig will be updated to accept a Negotiator instead of a
55+
// NegotiatedSerializer and NegotiatedSerializer will be removed.
56+
type ClientContentConfig struct {
57+
// AcceptContentTypes specifies the types the client will accept and is optional.
58+
// If not set, ContentType will be used to define the Accept header
59+
AcceptContentTypes string
60+
// ContentType specifies the wire format used to communicate with the server.
61+
// This value will be set as the Accept header on requests made to the server if
62+
// AcceptContentTypes is not set, and as the default content type on any object
63+
// sent to the server. If not set, "application/json" is used.
64+
ContentType string
65+
// GroupVersion is the API version to talk to. Must be provided when initializing
66+
// a RESTClient directly. When initializing a Client, will be set with the default
67+
// code version. This is used as the default group version for VersionedParams.
68+
GroupVersion schema.GroupVersion
69+
// Negotiator is used for obtaining encoders and decoders for multiple
70+
// supported media types.
71+
Negotiator runtime.ClientNegotiator
72+
}
73+
5474
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
5575
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
5676
// of one or more resources. The server should return a decodable API resource
@@ -64,66 +84,42 @@ type RESTClient struct {
6484
// versionedAPIPath is a path segment connecting the base URL to the resource root
6585
versionedAPIPath string
6686

67-
// contentConfig is the information used to communicate with the server.
68-
contentConfig ContentConfig
69-
70-
// serializers contain all serializers for underlying content type.
71-
serializers Serializers
87+
// content describes how a RESTClient encodes and decodes responses.
88+
content ClientContentConfig
7289

7390
// creates BackoffManager that is passed to requests.
7491
createBackoffMgr func() BackoffManager
7592

76-
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
77-
Throttle flowcontrol.RateLimiter
93+
// rateLimiter is shared among all requests created by this client unless specifically
94+
// overridden.
95+
rateLimiter flowcontrol.RateLimiter
7896

7997
// Set specific behavior of the client. If not set http.DefaultClient will be used.
8098
Client *http.Client
8199
}
82100

83-
type Serializers struct {
84-
Encoder runtime.Encoder
85-
Decoder runtime.Decoder
86-
StreamingSerializer runtime.Serializer
87-
Framer runtime.Framer
88-
RenegotiatedDecoder func(contentType string, params map[string]string) (runtime.Decoder, error)
89-
}
90-
91101
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
92-
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
93-
// decoding of responses from the server.
94-
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
102+
// such as Get, Put, Post, and Delete on specified paths.
103+
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
104+
if len(config.ContentType) == 0 {
105+
config.ContentType = "application/json"
106+
}
107+
95108
base := *baseURL
96109
if !strings.HasSuffix(base.Path, "/") {
97110
base.Path += "/"
98111
}
99112
base.RawQuery = ""
100113
base.Fragment = ""
101114

102-
if config.GroupVersion == nil {
103-
config.GroupVersion = &schema.GroupVersion{}
104-
}
105-
if len(config.ContentType) == 0 {
106-
config.ContentType = "application/json"
107-
}
108-
serializers, err := createSerializers(config)
109-
if err != nil {
110-
return nil, err
111-
}
112-
113-
var throttle flowcontrol.RateLimiter
114-
if maxQPS > 0 && rateLimiter == nil {
115-
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
116-
} else if rateLimiter != nil {
117-
throttle = rateLimiter
118-
}
119115
return &RESTClient{
120116
base: &base,
121117
versionedAPIPath: versionedAPIPath,
122-
contentConfig: config,
123-
serializers: *serializers,
118+
content: config,
124119
createBackoffMgr: readExpBackoffConfig,
125-
Throttle: throttle,
126-
Client: client,
120+
rateLimiter: rateLimiter,
121+
122+
Client: client,
127123
}, nil
128124
}
129125

@@ -132,7 +128,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
132128
if c == nil {
133129
return nil
134130
}
135-
return c.Throttle
131+
return c.rateLimiter
136132
}
137133

138134
// readExpBackoffConfig handles the internal logic of determining what the
@@ -153,58 +149,6 @@ func readExpBackoffConfig() BackoffManager {
153149
time.Duration(backoffDurationInt)*time.Second)}
154150
}
155151

156-
// createSerializers creates all necessary serializers for given contentType.
157-
// TODO: the negotiated serializer passed to this method should probably return
158-
// serializers that control decoding and versioning without this package
159-
// being aware of the types. Depends on whether RESTClient must deal with
160-
// generic infrastructure.
161-
func createSerializers(config ContentConfig) (*Serializers, error) {
162-
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
163-
contentType := config.ContentType
164-
mediaType, _, err := mime.ParseMediaType(contentType)
165-
if err != nil {
166-
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
167-
}
168-
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
169-
if !ok {
170-
if len(contentType) != 0 || len(mediaTypes) == 0 {
171-
return nil, fmt.Errorf("no serializers registered for %s", contentType)
172-
}
173-
info = mediaTypes[0]
174-
}
175-
176-
internalGV := schema.GroupVersions{
177-
{
178-
Group: config.GroupVersion.Group,
179-
Version: runtime.APIVersionInternal,
180-
},
181-
// always include the legacy group as a decoding target to handle non-error `Status` return types
182-
{
183-
Group: "",
184-
Version: runtime.APIVersionInternal,
185-
},
186-
}
187-
188-
s := &Serializers{
189-
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
190-
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
191-
192-
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
193-
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
194-
if !ok {
195-
return nil, fmt.Errorf("serializer for %s not registered", contentType)
196-
}
197-
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
198-
},
199-
}
200-
if info.StreamSerializer != nil {
201-
s.StreamingSerializer = info.StreamSerializer.Serializer
202-
s.Framer = info.StreamSerializer.Framer
203-
}
204-
205-
return s, nil
206-
}
207-
208152
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
209153
//
210154
// Example usage of RESTClient's request building interface:
@@ -219,12 +163,7 @@ func createSerializers(config ContentConfig) (*Serializers, error) {
219163
// list, ok := resp.(*api.PodList)
220164
//
221165
func (c *RESTClient) Verb(verb string) *Request {
222-
backoff := c.createBackoffMgr()
223-
224-
if c.Client == nil {
225-
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)
226-
}
227-
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)
166+
return NewRequest(c).Verb(verb)
228167
}
229168

230169
// Post begins a POST request. Short for c.Verb("POST").
@@ -254,5 +193,5 @@ func (c *RESTClient) Delete() *Request {
254193

255194
// APIVersion returns the APIVersion this RESTClient is expected to use.
256195
func (c *RESTClient) APIVersion() schema.GroupVersion {
257-
return *c.contentConfig.GroupVersion
196+
return c.content.GroupVersion
258197
}

rest/client_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
"fmt"
2929

30-
"k8s.io/api/core/v1"
30+
v1 "k8s.io/api/core/v1"
3131
v1beta1 "k8s.io/api/extensions/v1beta1"
3232
"k8s.io/apimachinery/pkg/api/errors"
3333
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -57,12 +57,14 @@ func TestSerializer(t *testing.T) {
5757
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
5858
}
5959

60-
serializer, err := createSerializers(contentConfig)
60+
n := runtime.NewClientNegotiator(contentConfig.NegotiatedSerializer, gv)
61+
d, err := n.Decoder("application/json", nil)
6162
if err != nil {
6263
t.Fatal(err)
6364
}
65+
6466
// bytes based on actual return from API server when encoding an "unversioned" object
65-
obj, err := runtime.Decode(serializer.Decoder, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`))
67+
obj, err := runtime.Decode(d, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`))
6668
t.Log(obj)
6769
if err != nil {
6870
t.Fatal(err)

rest/config.go

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ type ContentConfig struct {
269269
GroupVersion *schema.GroupVersion
270270
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
271271
// supported media types.
272+
//
273+
// TODO: NegotiatedSerializer will be phased out as internal clients are removed
274+
// from Kubernetes.
272275
NegotiatedSerializer runtime.NegotiatedSerializer
273276
}
274277

@@ -283,14 +286,6 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
283286
if config.NegotiatedSerializer == nil {
284287
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
285288
}
286-
qps := config.QPS
287-
if config.QPS == 0.0 {
288-
qps = DefaultQPS
289-
}
290-
burst := config.Burst
291-
if config.Burst == 0 {
292-
burst = DefaultBurst
293-
}
294289

295290
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
296291
if err != nil {
@@ -310,7 +305,33 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
310305
}
311306
}
312307

313-
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)
308+
rateLimiter := config.RateLimiter
309+
if rateLimiter == nil {
310+
qps := config.QPS
311+
if config.QPS == 0.0 {
312+
qps = DefaultQPS
313+
}
314+
burst := config.Burst
315+
if config.Burst == 0 {
316+
burst = DefaultBurst
317+
}
318+
if qps > 0 {
319+
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
320+
}
321+
}
322+
323+
var gv schema.GroupVersion
324+
if config.GroupVersion != nil {
325+
gv = *config.GroupVersion
326+
}
327+
clientContent := ClientContentConfig{
328+
AcceptContentTypes: config.AcceptContentTypes,
329+
ContentType: config.ContentType,
330+
GroupVersion: gv,
331+
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
332+
}
333+
334+
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
314335
}
315336

316337
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
@@ -338,13 +359,33 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
338359
}
339360
}
340361

341-
versionConfig := config.ContentConfig
342-
if versionConfig.GroupVersion == nil {
343-
v := metav1.SchemeGroupVersion
344-
versionConfig.GroupVersion = &v
362+
rateLimiter := config.RateLimiter
363+
if rateLimiter == nil {
364+
qps := config.QPS
365+
if config.QPS == 0.0 {
366+
qps = DefaultQPS
367+
}
368+
burst := config.Burst
369+
if config.Burst == 0 {
370+
burst = DefaultBurst
371+
}
372+
if qps > 0 {
373+
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
374+
}
375+
}
376+
377+
gv := metav1.SchemeGroupVersion
378+
if config.GroupVersion != nil {
379+
gv = *config.GroupVersion
380+
}
381+
clientContent := ClientContentConfig{
382+
AcceptContentTypes: config.AcceptContentTypes,
383+
ContentType: config.ContentType,
384+
GroupVersion: gv,
385+
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
345386
}
346387

347-
return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
388+
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
348389
}
349390

350391
// SetKubernetesDefaults sets default values on the provided client config for accessing the

0 commit comments

Comments
 (0)