Skip to content

[WIP] ✨ Adding timeout while waiting for cache to sync #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -50,7 +51,11 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
return err
}

cache, err := ip.InformersMap.Get(gvk, out)
ctx, cancelFunc := addTimeout(ctx)
if cancelFunc != nil {
defer cancelFunc()
}
cache, err := ip.InformersMap.Get(ctx, gvk, out)
if err != nil {
return err
}
Expand Down Expand Up @@ -90,35 +95,59 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
}
}

cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
ctx, cancelFunc := addTimeout(ctx)
if cancelFunc != nil {
defer cancelFunc()
}
cache, err := ip.InformersMap.Get(ctx, gvk, cacheTypeObj)
if err != nil {
return err
}

return cache.Reader.List(ctx, out, opts...)
}

// addTimeout adds a default 30 second timeout to a child contxt
// if one does not exist.
func addTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe change this to be addTimeoutIfUnset or something.

Also, do we want a default timeout? I'm unsure

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we want a default timeout? I'm unsure

That could also be an option it might be easier to punt the decision to the implementer as they probably know their operator and what it would expect better.

On the other hand, I think a case where we are never going to sync permissions error especially and will hang a worker is a bug that should be fixed.

I am torn :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could also be an option it might be easier to punt the decision to the implementer as they probably know their operator and what it would expect better.

That's my opinion -- defaults might fail weirdly in certain conditions, but hanging forever is bad too. The problem is that with defaults, there's not way to say "no, never time out" except to go astronomically large. Let's punt on the default for now (do it in a separate PR) so we can at least get the capability in and unbreak people that want to use this.

Relatedly: what's the practical timeout on client->server requests? If you just eat everything after the handshake on a direct client call, how long will we wait? Forever? If not, we can match that by default

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @DirectXMan12. My opinion is that when context.Context is exposed to callers, its generally to explicitly hand over full control of deadlining. I'd be little confused if I passed context.Background() and it timed out after 30s.

If we're not going to default the timeout, how do we handle GetInformerForKind and GetInformer, which don't accept a context.Context?

  • Interally use context.Background()
  • Make a breaking change to the Informers interface to include a Context parameter
  • Add a new interface that has methods that accept a Context and check whether the cache implementation supports it when calling.

Then there's the concern of how that propagates up the call stack. For example, what would we pass here?

On the other hand, I think a case where we are never going to sync permissions error especially and will hang a worker is a bug that should be fixed.

Is it possible to propagate these errors back up to the caller so that a worker doesn't hang?

Copy link
Author

@shawn-hurley shawn-hurley Sep 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the defaults and add the timeout error.

Is it possible to propagate these errors back up to the caller so that a worker doesn't hang?

We would have to change the underlying informer to do this, I don't know how we could do that without a backward incompatabile change ATM?

I like @DirectXMan12 idea of using the client's default timeout for all of these as it should be expected. note we should add go doc that one of the reasons this can timeout is permissions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we pass here

context.TODO, with a note & bug that when we go to make breaking changes, we fix the signature of start to include a context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(any internal uses where you say "we really should have a real context here, but can't because of the interface" should be TODO, not Background)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for propagating those errors up, it would be nice eventually, but we'd need to get wrapping errors from 1.13 before that becomes even slightly close to possible.

var cancelFunc context.CancelFunc
if _, ok := ctx.Deadline(); !ok {
ctx, cancelFunc = context.WithTimeout(ctx, 30*time.Second)
}
return ctx, cancelFunc
}

// GetInformerForKind returns the informer for the GroupVersionKind
// Will timeout after 30 seconds if the informer is new and can not be
// synced.
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
// Map the gvk to an object
obj, err := ip.Scheme.New(gvk)
if err != nil {
return nil, err
}
i, err := ip.InformersMap.Get(gvk, obj)
ctx, cancelFunc := addTimeout(context.TODO())
// There will never be a nil cancelFunc
defer cancelFunc()
i, err := ip.InformersMap.Get(ctx, gvk, obj)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj
// Will timeout after 30 seconds if the informer is new and can not be
// synced.
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}
i, err := ip.InformersMap.Get(gvk, obj)
ctx, cancelFunc := addTimeout(context.TODO())
// There will never be a nil cancelFunc
defer cancelFunc()
i, err := ip.InformersMap.Get(ctx, gvk, obj)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -73,16 +74,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

if isUnstructured {
return m.unstructured.Get(gvk, obj)
return m.unstructured.Get(ctx, gvk, obj)
}

return m.structured.Get(gvk, obj)
return m.structured.Get(ctx, gvk, obj)
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
Expand Down
20 changes: 17 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -145,7 +146,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -162,9 +163,22 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
}

if started && !i.Informer.HasSynced() {
syncReturn := make(chan bool)
done := make(chan struct{})
go func() {
syncReturn <- cache.WaitForCacheSync(done, i.Informer.HasSynced)
}()

// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
select {
case <-ctx.Done():
//end the polling for cache to sync
done <- struct{}{}
return nil, fmt.Errorf("timeout waiting for %T Informer to sync", obj)
case syncSuccess := <-syncReturn:
if !syncSuccess {
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
}
}
}

Expand Down