Skip to content

Commit 96ef67c

Browse files
committed
pkg/restmapper: return ErrRateLimited instead of blocking on backoff
1 parent 49997a9 commit 96ef67c

File tree

1 file changed

+91
-74
lines changed

1 file changed

+91
-74
lines changed

pkg/client/apiutil/dynamicrestmapper.go

Lines changed: 91 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,57 @@ limitations under the License.
1717
package apiutil
1818

1919
import (
20-
"sync"
20+
"fmt"
21+
"regexp"
22+
"strconv"
2123
"time"
2224

25+
"golang.org/x/time/rate"
2326
"k8s.io/apimachinery/pkg/api/meta"
2427
"k8s.io/apimachinery/pkg/runtime/schema"
2528
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26-
"k8s.io/apimachinery/pkg/util/wait"
2729
"k8s.io/client-go/discovery"
2830
"k8s.io/client-go/rest"
2931
"k8s.io/client-go/restmapper"
3032
)
3133

32-
// TODO(estroz): do we want to return a wait.ErrWaitTimeout if backoff duration
33-
// reaches a maximum?
34+
// ErrRateLimited is returned by a DynamicRESTMapper method if the number
35+
// of API calls has exceeded a limit within a certain time period.
36+
type ErrRateLimited struct {
37+
// Duration to wait until the next API call can be made.
38+
Delay time.Duration
39+
}
40+
41+
const errRLMsg = "too many API calls to the DynamicRESTMapper within a timeframe"
42+
43+
func (e ErrRateLimited) Error() string {
44+
return fmt.Sprintf("%s (%dns)", errRLMsg, int64(e.Delay))
45+
}
46+
47+
var errRLRe = regexp.MustCompile(fmt.Sprintf(".*%s \\(([0-9]+)ns\\).*", errRLMsg))
48+
49+
func IsRateLimited(err error) (time.Duration, bool) {
50+
if e, ok := err.(ErrRateLimited); ok {
51+
return e.Delay, true
52+
}
53+
if matches := errRLRe.FindStringSubmatch(err.Error()); len(matches) > 1 {
54+
d, err := strconv.Atoi(matches[1])
55+
if err == nil {
56+
return time.Duration(d), true
57+
}
58+
}
59+
return 0, false
60+
}
3461

3562
var (
36-
// BackoffDuration is the initial duration that the DynamicRESTMapper
37-
// waits to reload its REST mapping.
38-
BackoffDuration time.Duration = time.Millisecond * 10
39-
// BackoffDuration is the number of times that the DynamicRESTMapper
40-
// will perform an exponential backoff before maxing out.
41-
BackoffSteps = 10
63+
// LimitRate is the number of DynamicRESTMapper API calls allowed per second
64+
// assuming the rate of API calls <= LimitRate.
65+
// There is likely no need to change the default value.
66+
LimitRate = 600
67+
// LimitSize is the maximum number of simultaneous DynamicRESTMapper API
68+
// calls allowed.
69+
// There is likely no need to change the default value.
70+
LimitSize = 5
4271
)
4372

4473
// DynamicRESTMapper is a RESTMapper that dynamically discovers resource
@@ -49,7 +78,7 @@ var (
4978
type DynamicRESTMapper struct {
5079
client discovery.DiscoveryInterface
5180
delegate meta.RESTMapper
52-
backoff *backoff
81+
limiter *limiter
5382
}
5483

5584
// NewDynamicRESTMapper returns a DynamicRESTMapper for cfg.
@@ -61,27 +90,17 @@ func NewDynamicRESTMapper(cfg *rest.Config) (meta.RESTMapper, error) {
6190

6291
drm := &DynamicRESTMapper{
6392
client: client,
64-
backoff: &backoff{
65-
Backoff: wait.Backoff{
66-
Duration: BackoffDuration,
67-
Steps: BackoffSteps,
68-
Factor: 2,
69-
},
93+
limiter: &limiter{
94+
rate.NewLimiter(rate.Limit(LimitRate), LimitSize),
7095
},
7196
}
72-
// Substitute the default backoff error handler for our exponential one.
73-
if len(utilruntime.ErrorHandlers) > 1 {
74-
utilruntime.ErrorHandlers[1] = func(error) {
75-
time.Sleep(drm.backoff.step())
76-
}
77-
}
78-
if err := drm.reload(); err != nil {
97+
if err := drm.setDelegate(); err != nil {
7998
return nil, err
8099
}
81100
return drm, nil
82101
}
83102

84-
func (drm *DynamicRESTMapper) reload() error {
103+
func (drm *DynamicRESTMapper) setDelegate() error {
85104
gr, err := restmapper.GetAPIGroupResources(drm.client)
86105
if err != nil {
87106
return err
@@ -90,110 +109,108 @@ func (drm *DynamicRESTMapper) reload() error {
90109
return nil
91110
}
92111

93-
// reloadOnError checks if an error indicates that the delegated RESTMapper
94-
// needs to be reloaded, and if so, reloads it and returns true.
95-
// reloadOnError uses an exponential backoff mechanism to rate limit reloads.
96-
func (drm *DynamicRESTMapper) reloadOnError(err error) bool {
97-
if _, matches := err.(*meta.NoKindMatchError); !matches {
98-
drm.backoff.reset()
99-
return false
112+
func noKindMatchError(err error) bool {
113+
_, ok := err.(*meta.NoKindMatchError)
114+
return ok
115+
}
116+
117+
// reload reloads the delegated RESTMapper, and will return an error only
118+
// if a rate limit has been hit.
119+
func (drm *DynamicRESTMapper) reload() error {
120+
if err := drm.limiter.checkRate(); err != nil {
121+
return err
100122
}
101-
err = drm.reload()
102-
if err != nil {
103-
// TODO(estroz): HandleError uses a rudimentary backoff by default.
104-
// Should we remove it completely or substitute the default backoff
105-
// for our exponential one, as we do above?
123+
if err := drm.setDelegate(); err != nil {
106124
utilruntime.HandleError(err)
107125
}
108-
return err == nil
126+
return nil
109127
}
110128

111129
func (drm *DynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
112130
gvk, err := drm.delegate.KindFor(resource)
113-
if drm.reloadOnError(err) {
131+
if noKindMatchError(err) {
132+
if rerr := drm.reload(); rerr != nil {
133+
return schema.GroupVersionKind{}, rerr
134+
}
114135
gvk, err = drm.delegate.KindFor(resource)
115136
}
116137
return gvk, err
117138
}
118139

119140
func (drm *DynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
120141
gvks, err := drm.delegate.KindsFor(resource)
121-
if drm.reloadOnError(err) {
142+
if noKindMatchError(err) {
143+
if rerr := drm.reload(); rerr != nil {
144+
return nil, rerr
145+
}
122146
gvks, err = drm.delegate.KindsFor(resource)
123147
}
124148
return gvks, err
125149
}
126150

127151
func (drm *DynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
128152
gvr, err := drm.delegate.ResourceFor(input)
129-
if drm.reloadOnError(err) {
153+
if noKindMatchError(err) {
154+
if rerr := drm.reload(); rerr != nil {
155+
return schema.GroupVersionResource{}, rerr
156+
}
130157
gvr, err = drm.delegate.ResourceFor(input)
131158
}
132159
return gvr, err
133160
}
134161

135162
func (drm *DynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
136163
gvrs, err := drm.delegate.ResourcesFor(input)
137-
if drm.reloadOnError(err) {
164+
if noKindMatchError(err) {
165+
if rerr := drm.reload(); rerr != nil {
166+
return nil, rerr
167+
}
138168
gvrs, err = drm.delegate.ResourcesFor(input)
139169
}
140170
return gvrs, err
141171
}
142172

143173
func (drm *DynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
144174
m, err := drm.delegate.RESTMapping(gk, versions...)
145-
if drm.reloadOnError(err) {
175+
if noKindMatchError(err) {
176+
if rerr := drm.reload(); rerr != nil {
177+
return nil, rerr
178+
}
146179
m, err = drm.delegate.RESTMapping(gk, versions...)
147180
}
148181
return m, err
149182
}
150183

151184
func (drm *DynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
152185
ms, err := drm.delegate.RESTMappings(gk, versions...)
153-
if drm.reloadOnError(err) {
186+
if noKindMatchError(err) {
187+
if rerr := drm.reload(); rerr != nil {
188+
return nil, rerr
189+
}
154190
ms, err = drm.delegate.RESTMappings(gk, versions...)
155191
}
156192
return ms, err
157193
}
158194

159195
func (drm *DynamicRESTMapper) ResourceSingularizer(resource string) (singular string, err error) {
160196
s, err := drm.delegate.ResourceSingularizer(resource)
161-
if drm.reloadOnError(err) {
197+
if noKindMatchError(err) {
198+
if rerr := drm.reload(); rerr != nil {
199+
return "", rerr
200+
}
162201
s, err = drm.delegate.ResourceSingularizer(resource)
163202
}
164203
return s, err
165204
}
166205

167-
type backoff struct {
168-
wait.Backoff
169-
mu sync.Mutex
206+
type limiter struct {
207+
*rate.Limiter
170208
}
171209

172-
// Copied and pared down from
173-
// https://github.com/kubernetes/apimachinery/blob/6a84e37a896db9780c75367af8d2ed2bb944022e/pkg/util/wait/wait.go#L227
174-
// TODO(estroz) call Backoff.Step() once we bump to kubernetes-1.14.1
175-
func (b *backoff) step() time.Duration {
176-
b.mu.Lock()
177-
defer b.mu.Unlock()
178-
179-
if b.Steps < 1 {
180-
return b.Duration
181-
}
182-
b.Steps--
183-
184-
duration := b.Duration
185-
186-
// calculate the next step
187-
if b.Factor != 0 {
188-
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
210+
func (b *limiter) checkRate() error {
211+
res := b.Reserve()
212+
if res.Delay() == 0 {
213+
return nil
189214
}
190-
191-
return duration
192-
}
193-
194-
func (b *backoff) reset() {
195-
b.mu.Lock()
196-
b.Duration = BackoffDuration
197-
b.Steps = BackoffSteps
198-
b.mu.Unlock()
215+
return ErrRateLimited{res.Delay()}
199216
}

0 commit comments

Comments
 (0)