Skip to content

Commit eb107ca

Browse files
author
Shawn Hurley
committed
✨ Adding timeout to unset context for waiting for cache to sync.
Waiting for cache to sync happens when getting a new informer in the informer map.
1 parent d90bbc6 commit eb107ca

File tree

3 files changed

+44
-10
lines changed

3 files changed

+44
-10
lines changed

pkg/cache/informer_cache.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"reflect"
2323
"strings"
24+
"time"
2425

2526
apimeta "k8s.io/apimachinery/pkg/api/meta"
2627
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -50,7 +51,9 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
5051
return err
5152
}
5253

53-
cache, err := ip.InformersMap.Get(gvk, out)
54+
ctx, cancelFunc := addTimeout(ctx)
55+
defer cancelFunc()
56+
cache, err := ip.InformersMap.Get(ctx, gvk, out)
5457
if err != nil {
5558
return err
5659
}
@@ -90,35 +93,51 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
9093
}
9194
}
9295

93-
cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
96+
ctx, cancelFunc := addTimeout(ctx)
97+
defer cancelFunc()
98+
cache, err := ip.InformersMap.Get(ctx, gvk, cacheTypeObj)
9499
if err != nil {
95100
return err
96101
}
97102

98103
return cache.Reader.List(ctx, out, opts...)
99104
}
100105

106+
// addTimeout adds a default 30 second timeout to a child contxt
107+
// if one does not exist.
108+
func addTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
109+
var cancelFunc context.CancelFunc
110+
if _, ok := ctx.Deadline(); !ok {
111+
ctx, cancelFunc = context.WithTimeout(ctx, 30*time.Second)
112+
}
113+
return ctx, cancelFunc
114+
}
115+
101116
// GetInformerForKind returns the informer for the GroupVersionKind
117+
// Will timeout after 30 seconds if the informer is new and can not be
118+
// synced.
102119
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
103120
// Map the gvk to an object
104121
obj, err := ip.Scheme.New(gvk)
105122
if err != nil {
106123
return nil, err
107124
}
108-
i, err := ip.InformersMap.Get(gvk, obj)
125+
i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
109126
if err != nil {
110127
return nil, err
111128
}
112129
return i.Informer, err
113130
}
114131

115132
// GetInformer returns the informer for the obj
133+
// Will timeout after 30 seconds if the informer is new and can not be
134+
// synced.
116135
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
117136
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
118137
if err != nil {
119138
return nil, err
120139
}
121-
i, err := ip.InformersMap.Get(gvk, obj)
140+
i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
122141
if err != nil {
123142
return nil, err
124143
}

pkg/cache/internal/deleg_map.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"context"
2021
"time"
2122

2223
"k8s.io/apimachinery/pkg/api/meta"
@@ -73,16 +74,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
7374

7475
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
7576
// the Informer from the map.
76-
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
77+
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
7778
_, isUnstructured := obj.(*unstructured.Unstructured)
7879
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
7980
isUnstructured = isUnstructured || isUnstructuredList
8081

8182
if isUnstructured {
82-
return m.unstructured.Get(gvk, obj)
83+
return m.unstructured.Get(ctx, gvk, obj)
8384
}
8485

85-
return m.structured.Get(gvk, obj)
86+
return m.structured.Get(ctx, gvk, obj)
8687
}
8788

8889
// newStructuredInformersMap creates a new InformersMap for structured objects.

pkg/cache/internal/informers_map.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sync"
2223
"time"
@@ -145,7 +146,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
145146

146147
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
147148
// the Informer from the map.
148-
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
149+
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
149150
// Return the informer if it is found
150151
i, started, ok := func() (*MapEntry, bool, bool) {
151152
ip.mu.RLock()
@@ -162,9 +163,22 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
162163
}
163164

164165
if started && !i.Informer.HasSynced() {
166+
syncReturn := make(chan bool)
167+
done := make(chan struct{})
168+
go func() {
169+
syncReturn <- cache.WaitForCacheSync(done, i.Informer.HasSynced)
170+
}()
171+
165172
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
166-
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
167-
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
173+
select {
174+
case <-ctx.Done():
175+
//end the polling for cache to sync
176+
done <- struct{}{}
177+
return nil, fmt.Errorf("timeout waiting for %T Informer to sync", obj)
178+
case syncSuccess := <-syncReturn:
179+
if !syncSuccess {
180+
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
181+
}
168182
}
169183
}
170184

0 commit comments

Comments
 (0)