Skip to content

⚠ update Informers interface to accept context #830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -51,11 +52,11 @@ type Cache interface {
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(obj runtime.Object) (Informer, error)
GetInformer(ctx context.Context, obj runtime.Object) (Informer, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DirectXMan12 what is our policy for breaking changes? IMHO this change would be extremely great, if we want to give a longer deprecation period we could probably add a new InformersV2 interface and start using that internally?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably need to add this to v0.6 if it's a breaking change


// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error)
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)

// Start runs all the informers known to this cache until the given channel is closed.
// It blocks.
Expand Down
69 changes: 64 additions & 5 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
},
},
}
sii, err := informerCache.GetInformer(pod)
sii, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())
Expand All @@ -515,7 +515,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
It("should be able to get an informer by group/version/kind", func(done Done) {
By("getting an shared index informer for gvk = core/v1/pod")
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
sii, err := informerCache.GetInformerForKind(gvk)
sii, err := informerCache.GetInformerForKind(context.TODO(), gvk)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())
Expand Down Expand Up @@ -562,7 +562,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
indexFunc := func(obj runtime.Object) []string {
return []string{string(obj.(*kcorev1.Pod).Spec.RestartPolicy)}
}
Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed())
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())

By("running the cache and waiting for it to sync")
go func() {
Expand All @@ -581,6 +581,45 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
actual := listObj.Items[0]
Expect(actual.Name).To(Equal("test-pod-3"))
})

It("should allow for get informer to be cancelled", func() {
By("creating a context and cancelling it")
ctx, cancel := context.WithCancel(context.Background())
cancel()

By("getting a shared index informer for a pod with a cancelled context")
pod := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: "informer-obj",
Namespace: "default",
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
sii, err := informerCache.GetInformer(ctx, pod)
Expect(err).To(HaveOccurred())
Expect(sii).To(BeNil())
Expect(errors.IsTimeout(err)).To(BeTrue())
})

It("should allow getting an informer by group/version/kind to be cancelled", func() {
By("creating a context and cancelling it")
ctx, cancel := context.WithCancel(context.Background())
cancel()

By("getting an shared index informer for gvk = core/v1/pod with a cancelled context")
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
sii, err := informerCache.GetInformerForKind(ctx, gvk)
Expect(err).To(HaveOccurred())
Expect(sii).To(BeNil())
Expect(errors.IsTimeout(err)).To(BeTrue())
})
})
Context("with unstructured objects", func() {
It("should be able to get informer for the object", func(done Done) {
Expand All @@ -605,7 +644,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Version: "v1",
Kind: "Pod",
})
sii, err := informerCache.GetInformer(pod)
sii, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())
Expand Down Expand Up @@ -651,7 +690,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
}
return []string{fmt.Sprintf("%v", m["restartPolicy"])}
}
Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed())
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())

By("running the cache and waiting for it to sync")
go func() {
Expand All @@ -677,6 +716,26 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
actual := listObj.Items[0]
Expect(actual.GetName()).To(Equal("test-pod-3"))
}, 3)

It("should allow for get informer to be cancelled", func() {
By("creating a context and cancelling it")
ctx, cancel := context.WithCancel(context.Background())
cancel()

By("getting a shared index informer for a pod with a cancelled context")
pod := &unstructured.Unstructured{}
pod.SetName("informer-obj2")
pod.SetNamespace("default")
pod.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
})
sii, err := informerCache.GetInformer(ctx, pod)
Expect(err).To(HaveOccurred())
Expect(sii).To(BeNil())
Expect(errors.IsTimeout(err)).To(BeTrue())
})
})
})
})
Expand Down
16 changes: 6 additions & 10 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,32 +131,28 @@ func (ip *informerCache) objectTypeForListObject(list runtime.Object) (*schema.G
}

// GetInformerForKind returns the informer for the GroupVersionKind
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
// Map the gvk to an object
obj, err := ip.Scheme.New(gvk)
if err != nil {
return nil, err
}

// TODO(djzager): before a context can be passed down, the Informers interface
// must be updated to accept a context when getting an informer
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
func (ip *informerCache) GetInformer(ctx context.Context, obj runtime.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}

// TODO(djzager): before a context can be passed down, the Informers interface
// must be updated to accept a context when getting an informer
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
if err != nil {
return nil, err
}
Expand All @@ -174,8 +170,8 @@ func (ip *informerCache) NeedLeaderElection() bool {
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.
// The values may be anything. They will automatically be prefixed with the namespace of the
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
func (ip *informerCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
informer, err := ip.GetInformer(obj)
func (ip *informerCache) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
informer, err := ip.GetInformer(ctx, obj)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type FakeInformers struct {
}

// GetInformerForKind implements Informers
func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand All @@ -51,7 +51,7 @@ func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.I
}

// FakeInformerForKind implements Informers
func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) {
func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand All @@ -67,7 +67,7 @@ func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*contr
}

// GetInformer implements Informers
func (c *FakeInformers) GetInformer(obj runtime.Object) (cache.Informer, error) {
func (c *FakeInformers) GetInformer(ctx context.Context, obj runtime.Object) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *FakeInformers) Start(stopCh <-chan struct{}) error {
}

// IndexField implements Cache
func (c *FakeInformers) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
func (c *FakeInformers) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type multiNamespaceCache struct {
var _ Cache = &multiNamespaceCache{}

// Methods for multiNamespaceCache to conform to the Informers interface
func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error) {
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj runtime.Object) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformer(obj)
informer, err := cache.GetInformer(ctx, obj)
if err != nil {
return nil, err
}
Expand All @@ -82,10 +82,10 @@ func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error)
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformerForKind(gvk)
informer, err := cache.GetInformerForKind(ctx, gvk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,9 +117,9 @@ func (c *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool {
return synced
}

func (c *multiNamespaceCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
func (c *multiNamespaceCache) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
for _, cache := range c.namespaceToCache {
if err := cache.IndexField(obj, field, extractValue); err != nil {
if err := cache.IndexField(ctx, obj, field, extractValue); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func ExampleClient_deleteAllOf() {
// This example shows how to set up and consume a field selector over a pod's volumes' secretName field.
func ExampleFieldIndexer_secretName() {
// someIndexer is a FieldIndexer over a Cache
_ = someIndexer.IndexField(&corev1.Pod{}, "spec.volumes.secret.secretName", func(o runtime.Object) []string {
_ = someIndexer.IndexField(context.TODO(), &corev1.Pod{}, "spec.volumes.secret.secretName", func(o runtime.Object) []string {
var res []string
for _, vol := range o.(*corev1.Pod).Spec.Volumes {
if vol.Secret == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type FieldIndexer interface {
// and "equality" in the field selector means that at least one key matches the value.
// The FieldIndexer will automatically take care of indexing over namespace
// and supporting efficient all-namespace queries.
IndexField(obj runtime.Object, field string, extractValue IndexerFunc) error
IndexField(ctx context.Context, obj runtime.Object, field string, extractValue IndexerFunc) error
}

// IgnoreNotFound returns nil on NotFound errors.
Expand Down
5 changes: 3 additions & 2 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -106,9 +107,9 @@ var _ = Describe("controller", func() {

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(&appsv1.Deployment{})
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(&appsv1.ReplicaSet{})
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
Expect(err).NotTo(HaveOccurred())
ctrl.Cache = c
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }
Expand Down
3 changes: 2 additions & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package source

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -98,7 +99,7 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting
}

// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ks.Type)
i, err := ks.cache.GetInformer(context.TODO(), ks.Type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add context.Context to the Source interface's Start method to make this cancellable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, but maybe we can do that in a follow-up?

if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
Expand Down