Skip to content

Commit 0160ff5

Browse files
committed
Add low-level metadata-only informer support
This adds support for informers that communicate with the API server in metadata-only form. They are *completely* separate from normal informers -- that is: just like unstructured, if you ask for both a "normal" informer & a metadata-only informer, you'll get two copies of the cache.
1 parent f52b618 commit 0160ff5

File tree

4 files changed

+411
-11
lines changed

4 files changed

+411
-11
lines changed

pkg/cache/cache_test.go

Lines changed: 326 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
153153
Expect(listObj.Items).NotTo(BeEmpty())
154154
hasKubeService := false
155155
for _, svc := range listObj.Items {
156-
if svc.Namespace == "default" && svc.Name == "kubernetes" {
156+
if isKubeService(&svc) {
157157
hasKubeService = true
158158
break
159159
}
@@ -299,7 +299,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
299299
Expect(listObj.Items).NotTo(BeEmpty())
300300
hasKubeService := false
301301
for _, svc := range listObj.Items {
302-
if svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" {
302+
if isKubeService(&svc) {
303303
hasKubeService = true
304304
break
305305
}
@@ -471,6 +471,204 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
471471
svc := &kcorev1.Service{}
472472
svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"}
473473

474+
By("verifying that an error is returned")
475+
err := informerCache.Get(context.Background(), svcKey, svc)
476+
Expect(err).To(HaveOccurred())
477+
})
478+
})
479+
Context("with metadata-only objects", func() {
480+
It("should be able to list objects that haven't been watched previously", func() {
481+
By("listing all services in the cluster")
482+
listObj := &kmetav1.PartialObjectMetadataList{}
483+
listObj.SetGroupVersionKind(schema.GroupVersionKind{
484+
Group: "",
485+
Version: "v1",
486+
Kind: "ServiceList",
487+
})
488+
err := informerCache.List(context.Background(), listObj)
489+
Expect(err).To(Succeed())
490+
491+
By("verifying that the returned list contains the Kubernetes service")
492+
// NB: kubernetes default service is automatically created in testenv.
493+
Expect(listObj.Items).NotTo(BeEmpty())
494+
hasKubeService := false
495+
for _, svc := range listObj.Items {
496+
if isKubeService(&svc) {
497+
hasKubeService = true
498+
break
499+
}
500+
}
501+
Expect(hasKubeService).To(BeTrue())
502+
})
503+
It("should be able to get objects that haven't been watched previously", func() {
504+
By("getting the Kubernetes service")
505+
svc := &kmetav1.PartialObjectMetadata{}
506+
svc.SetGroupVersionKind(schema.GroupVersionKind{
507+
Group: "",
508+
Version: "v1",
509+
Kind: "Service",
510+
})
511+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
512+
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())
513+
514+
By("verifying that the returned service looks reasonable")
515+
Expect(svc.GetName()).To(Equal("kubernetes"))
516+
Expect(svc.GetNamespace()).To(Equal("default"))
517+
})
518+
519+
It("should support filtering by labels in a single namespace", func() {
520+
By("listing pods with a particular label")
521+
// NB: each pod has a "test-label": <pod-name>
522+
out := kmetav1.PartialObjectMetadataList{}
523+
out.SetGroupVersionKind(schema.GroupVersionKind{
524+
Group: "",
525+
Version: "v1",
526+
Kind: "PodList",
527+
})
528+
err := informerCache.List(context.Background(), &out,
529+
client.InNamespace(testNamespaceTwo),
530+
client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}))
531+
Expect(err).To(Succeed())
532+
533+
By("verifying the returned pods have the correct label")
534+
Expect(out.Items).NotTo(BeEmpty())
535+
Expect(out.Items).Should(HaveLen(1))
536+
actual := out.Items[0]
537+
Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2"))
538+
})
539+
540+
It("should support filtering by labels from multiple namespaces", func() {
541+
By("creating another pod with the same label but different namespace")
542+
anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways)
543+
defer deletePod(anotherPod)
544+
545+
By("listing pods with a particular label")
546+
// NB: each pod has a "test-label": <pod-name>
547+
out := kmetav1.PartialObjectMetadataList{}
548+
out.SetGroupVersionKind(schema.GroupVersionKind{
549+
Group: "",
550+
Version: "v1",
551+
Kind: "PodList",
552+
})
553+
labels := map[string]string{"test-label": "test-pod-2"}
554+
err := informerCache.List(context.Background(), &out, client.MatchingLabels(labels))
555+
Expect(err).To(Succeed())
556+
557+
By("verifying multiple pods with the same label in different namespaces are returned")
558+
Expect(out.Items).NotTo(BeEmpty())
559+
Expect(out.Items).Should(HaveLen(2))
560+
for _, actual := range out.Items {
561+
Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2"))
562+
}
563+
564+
})
565+
566+
It("should be able to list objects by namespace", func() {
567+
By("listing pods in test-namespace-1")
568+
listObj := &kmetav1.PartialObjectMetadataList{}
569+
listObj.SetGroupVersionKind(schema.GroupVersionKind{
570+
Group: "",
571+
Version: "v1",
572+
Kind: "PodList",
573+
})
574+
err := informerCache.List(context.Background(), listObj, client.InNamespace(testNamespaceOne))
575+
Expect(err).To(Succeed())
576+
577+
By("verifying that the returned pods are in test-namespace-1")
578+
Expect(listObj.Items).NotTo(BeEmpty())
579+
Expect(listObj.Items).Should(HaveLen(1))
580+
actual := listObj.Items[0]
581+
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
582+
})
583+
584+
It("should be able to restrict cache to a namespace", func() {
585+
By("creating a namespaced cache")
586+
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
587+
Expect(err).NotTo(HaveOccurred())
588+
589+
By("running the cache and waiting for it to sync")
590+
go func() {
591+
defer GinkgoRecover()
592+
Expect(namespacedCache.Start(informerCacheCtx)).To(Succeed())
593+
}()
594+
Expect(namespacedCache.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())
595+
596+
By("listing pods in all namespaces")
597+
out := &kmetav1.PartialObjectMetadataList{}
598+
out.SetGroupVersionKind(schema.GroupVersionKind{
599+
Group: "",
600+
Version: "v1",
601+
Kind: "PodList",
602+
})
603+
Expect(namespacedCache.List(context.Background(), out)).To(Succeed())
604+
605+
By("verifying the returned pod is from the watched namespace")
606+
Expect(out.Items).NotTo(BeEmpty())
607+
Expect(out.Items).Should(HaveLen(1))
608+
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))
609+
610+
By("listing all namespaces - should still be able to get a cluster-scoped resource")
611+
namespaceList := &kmetav1.PartialObjectMetadataList{}
612+
namespaceList.SetGroupVersionKind(schema.GroupVersionKind{
613+
Group: "",
614+
Version: "v1",
615+
Kind: "NamespaceList",
616+
})
617+
Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed())
618+
619+
By("verifying the namespace list is not empty")
620+
Expect(namespaceList.Items).NotTo(BeEmpty())
621+
})
622+
623+
It("should deep copy the object unless told otherwise", func() {
624+
By("retrieving a specific pod from the cache")
625+
out := &kmetav1.PartialObjectMetadata{}
626+
out.SetGroupVersionKind(schema.GroupVersionKind{
627+
Group: "",
628+
Version: "v1",
629+
Kind: "Pod",
630+
})
631+
uKnownPod2 := &kmetav1.PartialObjectMetadata{}
632+
knownPod2.(*kcorev1.Pod).ObjectMeta.DeepCopyInto(&uKnownPod2.ObjectMeta)
633+
uKnownPod2.SetGroupVersionKind(schema.GroupVersionKind{
634+
Group: "",
635+
Version: "v1",
636+
Kind: "Pod",
637+
})
638+
639+
podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo}
640+
Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed())
641+
642+
By("verifying the retrieved pod is equal to a known pod")
643+
Expect(out).To(Equal(uKnownPod2))
644+
645+
By("altering a field in the retrieved pod")
646+
out.Labels["foo"] = "bar"
647+
648+
By("verifying the pods are no longer equal")
649+
Expect(out).NotTo(Equal(knownPod2))
650+
})
651+
652+
It("should return an error if the object is not found", func() {
653+
By("getting a service that does not exists")
654+
svc := &kmetav1.PartialObjectMetadata{}
655+
svc.SetGroupVersionKind(schema.GroupVersionKind{
656+
Group: "",
657+
Version: "v1",
658+
Kind: "Service",
659+
})
660+
svcKey := client.ObjectKey{Namespace: testNamespaceOne, Name: "unknown"}
661+
662+
By("verifying that an error is returned")
663+
err := informerCache.Get(context.Background(), svcKey, svc)
664+
Expect(err).To(HaveOccurred())
665+
Expect(errors.IsNotFound(err)).To(BeTrue())
666+
})
667+
It("should return an error if getting object in unwatched namespace", func() {
668+
By("getting a service that does not exists")
669+
svc := &kcorev1.Service{}
670+
svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"}
671+
474672
By("verifying that an error is returned")
475673
err := informerCache.Get(context.Background(), svcKey, svc)
476674
Expect(err).To(HaveOccurred())
@@ -517,7 +715,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
517715
Eventually(out).Should(Receive(Equal(pod)))
518716
close(done)
519717
})
520-
// TODO: Add a test for when GVK is not in Scheme. Does code support informer for unstructured object?
521718
It("should be able to get an informer by group/version/kind", func(done Done) {
522719
By("getting an shared index informer for gvk = core/v1/pod")
523720
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
@@ -740,6 +937,126 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
740937
Expect(errors.IsTimeout(err)).To(BeTrue())
741938
})
742939
})
940+
Context("with metadata-only objects", func() {
941+
It("should be able to get informer for the object", func(done Done) {
942+
By("getting a shared index informer for a pod")
943+
944+
pod := &kcorev1.Pod{
945+
ObjectMeta: kmetav1.ObjectMeta{
946+
Name: "informer-obj",
947+
Namespace: "default",
948+
},
949+
Spec: kcorev1.PodSpec{
950+
Containers: []kcorev1.Container{
951+
{
952+
Name: "nginx",
953+
Image: "nginx",
954+
},
955+
},
956+
},
957+
}
958+
959+
podMeta := &kmetav1.PartialObjectMetadata{}
960+
pod.ObjectMeta.DeepCopyInto(&podMeta.ObjectMeta)
961+
podMeta.SetGroupVersionKind(schema.GroupVersionKind{
962+
Group: "",
963+
Version: "v1",
964+
Kind: "Pod",
965+
})
966+
967+
sii, err := informerCache.GetInformer(context.TODO(), podMeta)
968+
Expect(err).NotTo(HaveOccurred())
969+
Expect(sii).NotTo(BeNil())
970+
Expect(sii.HasSynced()).To(BeTrue())
971+
972+
By("adding an event handler listening for object creation which sends the object to a channel")
973+
out := make(chan interface{})
974+
addFunc := func(obj interface{}) {
975+
out <- obj
976+
}
977+
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
978+
979+
By("adding an object")
980+
cl, err := client.New(cfg, client.Options{})
981+
Expect(err).NotTo(HaveOccurred())
982+
Expect(cl.Create(context.Background(), pod)).To(Succeed())
983+
defer deletePod(pod)
984+
// re-copy the result in so that we can match on it properly
985+
pod.ObjectMeta.DeepCopyInto(&podMeta.ObjectMeta)
986+
// NB(directxman12): proto doesn't care typemeta, and
987+
// partialobjectmetadata is proto, so no typemeta
988+
// TODO(directxman12): we should paper over this in controller-runtime
989+
podMeta.APIVersion = ""
990+
podMeta.Kind = ""
991+
992+
By("verifying the object's metadata is received on the channel")
993+
Eventually(out).Should(Receive(Equal(podMeta)))
994+
close(done)
995+
}, 3)
996+
997+
It("should be able to index an object field then retrieve objects by that field", func() {
998+
By("creating the cache")
999+
informer, err := cache.New(cfg, cache.Options{})
1000+
Expect(err).NotTo(HaveOccurred())
1001+
1002+
By("indexing the restartPolicy field of the Pod object before starting")
1003+
pod := &kmetav1.PartialObjectMetadata{}
1004+
pod.SetGroupVersionKind(schema.GroupVersionKind{
1005+
Group: "",
1006+
Version: "v1",
1007+
Kind: "Pod",
1008+
})
1009+
indexFunc := func(obj client.Object) []string {
1010+
metadata := obj.(*kmetav1.PartialObjectMetadata)
1011+
return []string{metadata.Labels["test-label"]}
1012+
}
1013+
Expect(informer.IndexField(context.TODO(), pod, "metadata.labels.test-label", indexFunc)).To(Succeed())
1014+
1015+
By("running the cache and waiting for it to sync")
1016+
go func() {
1017+
defer GinkgoRecover()
1018+
Expect(informer.Start(informerCacheCtx)).To(Succeed())
1019+
}()
1020+
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())
1021+
1022+
By("listing Pods with restartPolicyOnFailure")
1023+
listObj := &kmetav1.PartialObjectMetadataList{}
1024+
listObj.SetGroupVersionKind(schema.GroupVersionKind{
1025+
Group: "",
1026+
Version: "v1",
1027+
Kind: "PodList",
1028+
})
1029+
err = informer.List(context.Background(), listObj,
1030+
client.MatchingFields{"metadata.labels.test-label": "test-pod-3"})
1031+
Expect(err).To(Succeed())
1032+
1033+
By("verifying that the returned pods have correct restart policy")
1034+
Expect(listObj.Items).NotTo(BeEmpty())
1035+
Expect(listObj.Items).Should(HaveLen(1))
1036+
actual := listObj.Items[0]
1037+
Expect(actual.GetName()).To(Equal("test-pod-3"))
1038+
}, 3)
1039+
1040+
It("should allow for get informer to be cancelled", func() {
1041+
By("creating a context and cancelling it")
1042+
ctx, cancel := context.WithCancel(context.Background())
1043+
cancel()
1044+
1045+
By("getting a shared index informer for a pod with a cancelled context")
1046+
pod := &kmetav1.PartialObjectMetadata{}
1047+
pod.SetName("informer-obj2")
1048+
pod.SetNamespace("default")
1049+
pod.SetGroupVersionKind(schema.GroupVersionKind{
1050+
Group: "",
1051+
Version: "v1",
1052+
Kind: "Pod",
1053+
})
1054+
sii, err := informerCache.GetInformer(ctx, pod)
1055+
Expect(err).To(HaveOccurred())
1056+
Expect(sii).To(BeNil())
1057+
Expect(errors.IsTimeout(err)).To(BeTrue())
1058+
})
1059+
})
7431060
})
7441061
})
7451062
}
@@ -761,3 +1078,9 @@ func ensureNamespace(namespace string, client client.Client) error {
7611078
}
7621079
return err
7631080
}
1081+
1082+
//nolint:interfacer
1083+
func isKubeService(svc kmetav1.Object) bool {
1084+
// grumble grumble linters grumble grumble
1085+
return svc.GetNamespace() == "default" && svc.GetName() == "kubernetes"
1086+
}

0 commit comments

Comments
 (0)