Skip to content

[WIP] pkg/restmapper: use exponential backoff with DynamicRESTMapper calls #1792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ require (
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 // indirect
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a // indirect
golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools v0.0.0-20190408170212-12dd9f86f350
google.golang.org/appengine v1.5.0 // indirect
google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107 // indirect
Expand Down
134 changes: 112 additions & 22 deletions pkg/restmapper/dynamicrestmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
package restmapper

import (
"fmt"
"regexp"
"strconv"
"time"

"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -23,29 +29,76 @@ import (
"k8s.io/client-go/restmapper"
)

// ErrRateLimited is returned by a DynamicRESTMapper method if the number
// of API calls has exceeded a limit within a certain time period.
type ErrRateLimited struct {
// Duration to wait until the next API call can be made.
Delay time.Duration
}

const errRLMsg = "too many API calls to the DynamicRESTMapper within a timeframe"

func (e ErrRateLimited) Error() string {
return fmt.Sprintf("%s (%dns)", errRLMsg, int64(e.Delay))
}

var errRLRe = regexp.MustCompile(fmt.Sprintf(".*%s \\(([0-9]+)ns\\).*", errRLMsg))

func IsRateLimited(err error) (time.Duration, bool) {
if e, ok := err.(ErrRateLimited); ok {
return e.Delay, true
}
if matches := errRLRe.FindStringSubmatch(err.Error()); len(matches) > 1 {
d, err := strconv.Atoi(matches[1])
if err == nil {
return time.Duration(d), true
}
}
return 0, false
}

var (
// LimitRate is the number of DynamicRESTMapper API calls allowed per second
// assuming the rate of API calls <= LimitRate.
// There is likely no need to change the default value.
LimitRate = 600
// LimitSize is the maximum number of simultaneous DynamicRESTMapper API
// calls allowed.
// There is likely no need to change the default value.
LimitSize = 5
)

// DynamicRESTMapper is a RESTMapper that dynamically discovers resource
// types at runtime. This is in contrast to controller-manager's default
// RESTMapper, which only checks resource types at startup, and so can't
// handle the case of first creating a CRD and then creating an instance
// of that CRD.
type DynamicRESTMapper struct {
client discovery.DiscoveryInterface
delegate meta.RESTMapper
limiter *limiter
}

// NewDynamicRESTMapper returns a RESTMapper that dynamically discovers resource
// types at runtime. This is in contrast to controller-manager's default RESTMapper, which
// only checks resource types at startup, and so can't handle the case of first creating a
// CRD and then creating an instance of that CRD.
// NewDynamicRESTMapper returns a DynamicRESTMapper for cfg.
func NewDynamicRESTMapper(cfg *rest.Config) (meta.RESTMapper, error) {
client, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
}

drm := &DynamicRESTMapper{client: client}
if err := drm.reload(); err != nil {
drm := &DynamicRESTMapper{
client: client,
limiter: &limiter{
rate.NewLimiter(rate.Limit(LimitRate), LimitSize),
},
}
if err := drm.setDelegate(); err != nil {
return nil, err
}
return drm, nil
}

func (drm *DynamicRESTMapper) reload() error {
func (drm *DynamicRESTMapper) setDelegate() error {
gr, err := restmapper.GetAPIGroupResources(drm.client)
if err != nil {
return err
Expand All @@ -54,71 +107,108 @@ func (drm *DynamicRESTMapper) reload() error {
return nil
}

// reloadOnError checks if an error indicates that the delegated RESTMapper needs to be
// reloaded, and if so, reloads it and returns true.
func (drm *DynamicRESTMapper) reloadOnError(err error) bool {
if _, matches := err.(*meta.NoKindMatchError); !matches {
return false
func noKindMatchError(err error) bool {
_, ok := err.(*meta.NoKindMatchError)
return ok
}

// reload reloads the delegated RESTMapper, and will return an error only
// if a rate limit has been hit.
func (drm *DynamicRESTMapper) reload() error {
if err := drm.limiter.checkRate(); err != nil {
return err
}
err = drm.reload()
if err != nil {
if err := drm.setDelegate(); err != nil {
utilruntime.HandleError(err)
}
return err == nil
return nil
}

func (drm *DynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
gvk, err := drm.delegate.KindFor(resource)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return schema.GroupVersionKind{}, rerr
}
gvk, err = drm.delegate.KindFor(resource)
}
return gvk, err
}

func (drm *DynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
gvks, err := drm.delegate.KindsFor(resource)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return nil, rerr
}
gvks, err = drm.delegate.KindsFor(resource)
}
return gvks, err
}

func (drm *DynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
gvr, err := drm.delegate.ResourceFor(input)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return schema.GroupVersionResource{}, rerr
}
gvr, err = drm.delegate.ResourceFor(input)
}
return gvr, err
}

func (drm *DynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
gvrs, err := drm.delegate.ResourcesFor(input)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return nil, rerr
}
gvrs, err = drm.delegate.ResourcesFor(input)
}
return gvrs, err
}

func (drm *DynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
m, err := drm.delegate.RESTMapping(gk, versions...)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return nil, rerr
}
m, err = drm.delegate.RESTMapping(gk, versions...)
}
return m, err
}

func (drm *DynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
ms, err := drm.delegate.RESTMappings(gk, versions...)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return nil, rerr
}
ms, err = drm.delegate.RESTMappings(gk, versions...)
}
return ms, err
}

func (drm *DynamicRESTMapper) ResourceSingularizer(resource string) (singular string, err error) {
s, err := drm.delegate.ResourceSingularizer(resource)
if drm.reloadOnError(err) {
if noKindMatchError(err) {
if rerr := drm.reload(); rerr != nil {
return "", rerr
}
s, err = drm.delegate.ResourceSingularizer(resource)
}
return s, err
}

type limiter struct {
*rate.Limiter
}

func (b *limiter) checkRate() error {
res := b.Reserve()
if res.Delay() == 0 {
return nil
}
return ErrRateLimited{res.Delay()}
}