Skip to content

✨ Take context when getting informer. #663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
err := informerCache.Get(context.Background(), svcKey, svc)
Expect(err).To(HaveOccurred())
})

It("should return an error when context is cancelled", func() {
By("creating a context and cancelling it")
ctx, cancel := context.WithCancel(context.Background())
cancel()

By("listing pods in test-namespace-1 with a cancelled context")
listObj := &kcorev1.PodList{}
err := informerCache.List(ctx, listObj, client.InNamespace(testNamespaceOne))

By("verifying that an error is returned")
Expect(err).To(HaveOccurred())
Expect(errors.IsTimeout(err)).To(BeTrue())
})
})
Context("with unstructured objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
Expand Down
15 changes: 10 additions & 5 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
return err
}

started, cache, err := ip.InformersMap.Get(gvk, out)
started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
if err != nil {
return err
}
Expand All @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
return err
}

started, cache, err := ip.InformersMap.Get(*gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,7 +128,6 @@ func (ip *informerCache) objectTypeForListObject(list runtime.Object) (*schema.G
}

return &gvk, cacheTypeObj, nil

}

// GetInformerForKind returns the informer for the GroupVersionKind
Expand All @@ -138,7 +137,10 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)

// TODO(djzager): before a context can be passed down, the Informers interface
// must be updated to accept a context when getting an informer
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a TODO here explaining what we'd need to do to plumb this through?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My best attempt at an explanation. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, looks good. Do you mind putting together a follow-up to do that?

if err != nil {
return nil, err
}
Expand All @@ -151,7 +153,10 @@ func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)

// TODO(djzager): before a context can be passed down, the Informers interface
// must be updated to accept a context when getting an informer
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -79,16 +80,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

if isUnstructured {
return m.unstructured.Get(gvk, obj)
return m.unstructured.Get(ctx, gvk, obj)
}

return m.structured.Get(gvk, obj)
return m.structured.Get(ctx, gvk, obj)
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
Expand Down
8 changes: 5 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package internal

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -163,7 +165,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -181,8 +183,8 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj

if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}

Expand Down