Skip to content

Commit 4f90fb8

Browse files
committed
✨ Adding timeout to unset context for waiting for cache to sync.
Waiting for cache to sync happens when getting a new informer in the informer map.
1 parent c38f59a commit 4f90fb8

File tree

3 files changed

+55
-10
lines changed

3 files changed

+55
-10
lines changed

pkg/cache/informer_cache.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"reflect"
2323
"strings"
24+
"time"
2425

2526
apimeta "k8s.io/apimachinery/pkg/api/meta"
2627
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -57,7 +58,12 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
5758
return err
5859
}
5960

60-
started, cache, err := ip.InformersMap.Get(gvk, out)
61+
ctx, cancelFunc := addTimeout(ctx)
62+
if cancelFunc != nil {
63+
defer cancelFunc()
64+
}
65+
cache, err := ip.InformersMap.Get(ctx, gvk, out)
66+
started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
6167
if err != nil {
6268
return err
6369
}
@@ -101,7 +107,11 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
101107
}
102108
}
103109

104-
started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
110+
ctx, cancelFunc := addTimeout(ctx)
111+
if cancelFunc != nil {
112+
defer cancelFunc()
113+
}
114+
started, cache, err := ip.InformersMap.Get(ctx, gvk, cacheTypeObj)
105115
if err != nil {
106116
return err
107117
}
@@ -113,27 +123,47 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
113123
return cache.Reader.List(ctx, out, opts...)
114124
}
115125

126+
// addTimeout adds a default 30 second timeout to a child contxt
127+
// if one does not exist.
128+
func addTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
129+
var cancelFunc context.CancelFunc
130+
if _, ok := ctx.Deadline(); !ok {
131+
ctx, cancelFunc = context.WithTimeout(ctx, 30*time.Second)
132+
}
133+
return ctx, cancelFunc
134+
}
135+
116136
// GetInformerForKind returns the informer for the GroupVersionKind
137+
// Will timeout after 30 seconds if the informer is new and can not be
138+
// synced.
117139
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
118140
// Map the gvk to an object
119141
obj, err := ip.Scheme.New(gvk)
120142
if err != nil {
121143
return nil, err
122144
}
123-
_, i, err := ip.InformersMap.Get(gvk, obj)
145+
ctx, cancelFunc := addTimeout(context.TODO())
146+
// There will never be a nil cancelFunc
147+
defer cancelFunc()
148+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
124149
if err != nil {
125150
return nil, err
126151
}
127152
return i.Informer, err
128153
}
129154

130155
// GetInformer returns the informer for the obj
156+
// Will timeout after 30 seconds if the informer is new and can not be
157+
// synced.
131158
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
132159
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
133160
if err != nil {
134161
return nil, err
135162
}
136-
_, i, err := ip.InformersMap.Get(gvk, obj)
163+
ctx, cancelFunc := addTimeout(context.TODO())
164+
// There will never be a nil cancelFunc
165+
defer cancelFunc()
166+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
137167
if err != nil {
138168
return nil, err
139169
}

pkg/cache/internal/deleg_map.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"context"
2021
"time"
2122

2223
"k8s.io/apimachinery/pkg/api/meta"
@@ -79,16 +80,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
7980

8081
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
8182
// the Informer from the map.
82-
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
83+
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
8384
_, isUnstructured := obj.(*unstructured.Unstructured)
8485
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
8586
isUnstructured = isUnstructured || isUnstructuredList
8687

8788
if isUnstructured {
88-
return m.unstructured.Get(gvk, obj)
89+
return m.unstructured.Get(ctx, gvk, obj)
8990
}
9091

91-
return m.structured.Get(gvk, obj)
92+
return m.structured.Get(ctx, gvk, obj)
9293
}
9394

9495
// newStructuredInformersMap creates a new InformersMap for structured objects.

pkg/cache/internal/informers_map.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"math/rand"
2223
"sync"
@@ -163,7 +164,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
163164

164165
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
165166
// the Informer from the map.
166-
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
167+
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
167168
// Return the informer if it is found
168169
i, started, ok := func() (*MapEntry, bool, bool) {
169170
ip.mu.RLock()
@@ -180,9 +181,22 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
180181
}
181182

182183
if started && !i.Informer.HasSynced() {
184+
syncReturn := make(chan bool)
185+
done := make(chan struct{})
186+
go func() {
187+
syncReturn <- cache.WaitForCacheSync(done, i.Informer.HasSynced)
188+
}()
189+
183190
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
184-
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
185-
return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
191+
select {
192+
case <-ctx.Done():
193+
//end the polling for cache to sync
194+
done <- struct{}{}
195+
return started, nil, fmt.Errorf("timeout waiting for %T Informer to sync", obj)
196+
case syncSuccess := <-syncReturn:
197+
if !syncSuccess {
198+
return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
199+
}
186200
}
187201
}
188202

0 commit comments

Comments
 (0)