Skip to content

Commit 99e50a2

Browse files
committed
Add low-level metadata-only informer support
This adds support for informers that communicate with the API server in metadata-only form. They are *completely* separate from normal informers -- that is: just like unstructured, if you ask for both a "normal" informer & a metadata-only informer, you'll get two copies of the cache.
1 parent a6e9977 commit 99e50a2

File tree

4 files changed

+411
-11
lines changed

4 files changed

+411
-11
lines changed

pkg/cache/cache_test.go

Lines changed: 326 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
154154
Expect(listObj.Items).NotTo(BeEmpty())
155155
hasKubeService := false
156156
for _, svc := range listObj.Items {
157-
if svc.Namespace == "default" && svc.Name == "kubernetes" {
157+
if isKubeService(&svc) {
158158
hasKubeService = true
159159
break
160160
}
@@ -300,7 +300,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
300300
Expect(listObj.Items).NotTo(BeEmpty())
301301
hasKubeService := false
302302
for _, svc := range listObj.Items {
303-
if svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" {
303+
if isKubeService(&svc) {
304304
hasKubeService = true
305305
break
306306
}
@@ -472,6 +472,204 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
472472
svc := &kcorev1.Service{}
473473
svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"}
474474

475+
By("verifying that an error is returned")
476+
err := informerCache.Get(context.Background(), svcKey, svc)
477+
Expect(err).To(HaveOccurred())
478+
})
479+
})
480+
Context("with metadata-only objects", func() {
481+
It("should be able to list objects that haven't been watched previously", func() {
482+
By("listing all services in the cluster")
483+
listObj := &kmetav1.PartialObjectMetadataList{}
484+
listObj.SetGroupVersionKind(schema.GroupVersionKind{
485+
Group: "",
486+
Version: "v1",
487+
Kind: "ServiceList",
488+
})
489+
err := informerCache.List(context.Background(), listObj)
490+
Expect(err).To(Succeed())
491+
492+
By("verifying that the returned list contains the Kubernetes service")
493+
// NB: kubernetes default service is automatically created in testenv.
494+
Expect(listObj.Items).NotTo(BeEmpty())
495+
hasKubeService := false
496+
for _, svc := range listObj.Items {
497+
if isKubeService(&svc) {
498+
hasKubeService = true
499+
break
500+
}
501+
}
502+
Expect(hasKubeService).To(BeTrue())
503+
})
504+
It("should be able to get objects that haven't been watched previously", func() {
505+
By("getting the Kubernetes service")
506+
svc := &kmetav1.PartialObjectMetadata{}
507+
svc.SetGroupVersionKind(schema.GroupVersionKind{
508+
Group: "",
509+
Version: "v1",
510+
Kind: "Service",
511+
})
512+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
513+
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())
514+
515+
By("verifying that the returned service looks reasonable")
516+
Expect(svc.GetName()).To(Equal("kubernetes"))
517+
Expect(svc.GetNamespace()).To(Equal("default"))
518+
})
519+
520+
It("should support filtering by labels in a single namespace", func() {
521+
By("listing pods with a particular label")
522+
// NB: each pod has a "test-label": <pod-name>
523+
out := kmetav1.PartialObjectMetadataList{}
524+
out.SetGroupVersionKind(schema.GroupVersionKind{
525+
Group: "",
526+
Version: "v1",
527+
Kind: "PodList",
528+
})
529+
err := informerCache.List(context.Background(), &out,
530+
client.InNamespace(testNamespaceTwo),
531+
client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}))
532+
Expect(err).To(Succeed())
533+
534+
By("verifying the returned pods have the correct label")
535+
Expect(out.Items).NotTo(BeEmpty())
536+
Expect(out.Items).Should(HaveLen(1))
537+
actual := out.Items[0]
538+
Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2"))
539+
})
540+
541+
It("should support filtering by labels from multiple namespaces", func() {
542+
By("creating another pod with the same label but different namespace")
543+
anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways)
544+
defer deletePod(anotherPod)
545+
546+
By("listing pods with a particular label")
547+
// NB: each pod has a "test-label": <pod-name>
548+
out := kmetav1.PartialObjectMetadataList{}
549+
out.SetGroupVersionKind(schema.GroupVersionKind{
550+
Group: "",
551+
Version: "v1",
552+
Kind: "PodList",
553+
})
554+
labels := map[string]string{"test-label": "test-pod-2"}
555+
err := informerCache.List(context.Background(), &out, client.MatchingLabels(labels))
556+
Expect(err).To(Succeed())
557+
558+
By("verifying multiple pods with the same label in different namespaces are returned")
559+
Expect(out.Items).NotTo(BeEmpty())
560+
Expect(out.Items).Should(HaveLen(2))
561+
for _, actual := range out.Items {
562+
Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2"))
563+
}
564+
565+
})
566+
567+
It("should be able to list objects by namespace", func() {
568+
By("listing pods in test-namespace-1")
569+
listObj := &kmetav1.PartialObjectMetadataList{}
570+
listObj.SetGroupVersionKind(schema.GroupVersionKind{
571+
Group: "",
572+
Version: "v1",
573+
Kind: "PodList",
574+
})
575+
err := informerCache.List(context.Background(), listObj, client.InNamespace(testNamespaceOne))
576+
Expect(err).To(Succeed())
577+
578+
By("verifying that the returned pods are in test-namespace-1")
579+
Expect(listObj.Items).NotTo(BeEmpty())
580+
Expect(listObj.Items).Should(HaveLen(1))
581+
actual := listObj.Items[0]
582+
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
583+
})
584+
585+
It("should be able to restrict cache to a namespace", func() {
586+
By("creating a namespaced cache")
587+
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
588+
Expect(err).NotTo(HaveOccurred())
589+
590+
By("running the cache and waiting for it to sync")
591+
go func() {
592+
defer GinkgoRecover()
593+
Expect(namespacedCache.Start(stop)).To(Succeed())
594+
}()
595+
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())
596+
597+
By("listing pods in all namespaces")
598+
out := &kmetav1.PartialObjectMetadataList{}
599+
out.SetGroupVersionKind(schema.GroupVersionKind{
600+
Group: "",
601+
Version: "v1",
602+
Kind: "PodList",
603+
})
604+
Expect(namespacedCache.List(context.Background(), out)).To(Succeed())
605+
606+
By("verifying the returned pod is from the watched namespace")
607+
Expect(out.Items).NotTo(BeEmpty())
608+
Expect(out.Items).Should(HaveLen(1))
609+
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))
610+
611+
By("listing all namespaces - should still be able to get a cluster-scoped resource")
612+
namespaceList := &kmetav1.PartialObjectMetadataList{}
613+
namespaceList.SetGroupVersionKind(schema.GroupVersionKind{
614+
Group: "",
615+
Version: "v1",
616+
Kind: "NamespaceList",
617+
})
618+
Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed())
619+
620+
By("verifying the namespace list is not empty")
621+
Expect(namespaceList.Items).NotTo(BeEmpty())
622+
})
623+
624+
It("should deep copy the object unless told otherwise", func() {
625+
By("retrieving a specific pod from the cache")
626+
out := &kmetav1.PartialObjectMetadata{}
627+
out.SetGroupVersionKind(schema.GroupVersionKind{
628+
Group: "",
629+
Version: "v1",
630+
Kind: "Pod",
631+
})
632+
uKnownPod2 := &kmetav1.PartialObjectMetadata{}
633+
knownPod2.(*kcorev1.Pod).ObjectMeta.DeepCopyInto(&uKnownPod2.ObjectMeta)
634+
uKnownPod2.SetGroupVersionKind(schema.GroupVersionKind{
635+
Group: "",
636+
Version: "v1",
637+
Kind: "Pod",
638+
})
639+
640+
podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo}
641+
Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed())
642+
643+
By("verifying the retrieved pod is equal to a known pod")
644+
Expect(out).To(Equal(uKnownPod2))
645+
646+
By("altering a field in the retrieved pod")
647+
out.Labels["foo"] = "bar"
648+
649+
By("verifying the pods are no longer equal")
650+
Expect(out).NotTo(Equal(knownPod2))
651+
})
652+
653+
It("should return an error if the object is not found", func() {
654+
By("getting a service that does not exists")
655+
svc := &kmetav1.PartialObjectMetadata{}
656+
svc.SetGroupVersionKind(schema.GroupVersionKind{
657+
Group: "",
658+
Version: "v1",
659+
Kind: "Service",
660+
})
661+
svcKey := client.ObjectKey{Namespace: testNamespaceOne, Name: "unknown"}
662+
663+
By("verifying that an error is returned")
664+
err := informerCache.Get(context.Background(), svcKey, svc)
665+
Expect(err).To(HaveOccurred())
666+
Expect(errors.IsNotFound(err)).To(BeTrue())
667+
})
668+
It("should return an error if getting object in unwatched namespace", func() {
669+
By("getting a service that does not exists")
670+
svc := &kcorev1.Service{}
671+
svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"}
672+
475673
By("verifying that an error is returned")
476674
err := informerCache.Get(context.Background(), svcKey, svc)
477675
Expect(err).To(HaveOccurred())
@@ -518,7 +716,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
518716
Eventually(out).Should(Receive(Equal(pod)))
519717
close(done)
520718
})
521-
// TODO: Add a test for when GVK is not in Scheme. Does code support informer for unstructured object?
522719
It("should be able to get an informer by group/version/kind", func(done Done) {
523720
By("getting an shared index informer for gvk = core/v1/pod")
524721
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
@@ -741,6 +938,126 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
741938
Expect(errors.IsTimeout(err)).To(BeTrue())
742939
})
743940
})
941+
Context("with metadata-only objects", func() {
942+
It("should be able to get informer for the object", func(done Done) {
943+
By("getting a shared index informer for a pod")
944+
945+
pod := &kcorev1.Pod{
946+
ObjectMeta: kmetav1.ObjectMeta{
947+
Name: "informer-obj",
948+
Namespace: "default",
949+
},
950+
Spec: kcorev1.PodSpec{
951+
Containers: []kcorev1.Container{
952+
{
953+
Name: "nginx",
954+
Image: "nginx",
955+
},
956+
},
957+
},
958+
}
959+
960+
podMeta := &kmetav1.PartialObjectMetadata{}
961+
pod.ObjectMeta.DeepCopyInto(&podMeta.ObjectMeta)
962+
podMeta.SetGroupVersionKind(schema.GroupVersionKind{
963+
Group: "",
964+
Version: "v1",
965+
Kind: "Pod",
966+
})
967+
968+
sii, err := informerCache.GetInformer(context.TODO(), podMeta)
969+
Expect(err).NotTo(HaveOccurred())
970+
Expect(sii).NotTo(BeNil())
971+
Expect(sii.HasSynced()).To(BeTrue())
972+
973+
By("adding an event handler listening for object creation which sends the object to a channel")
974+
out := make(chan interface{})
975+
addFunc := func(obj interface{}) {
976+
out <- obj
977+
}
978+
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
979+
980+
By("adding an object")
981+
cl, err := client.New(cfg, client.Options{})
982+
Expect(err).NotTo(HaveOccurred())
983+
Expect(cl.Create(context.Background(), pod)).To(Succeed())
984+
defer deletePod(pod)
985+
// re-copy the result in so that we can match on it properly
986+
pod.ObjectMeta.DeepCopyInto(&podMeta.ObjectMeta)
987+
// NB(directxman12): proto doesn't care typemeta, and
988+
// partialobjectmetadata is proto, so no typemeta
989+
// TODO(directxman12): we should paper over this in controller-runtime
990+
podMeta.APIVersion = ""
991+
podMeta.Kind = ""
992+
993+
By("verifying the object's metadata is received on the channel")
994+
Eventually(out).Should(Receive(Equal(podMeta)))
995+
close(done)
996+
}, 3)
997+
998+
It("should be able to index an object field then retrieve objects by that field", func() {
999+
By("creating the cache")
1000+
informer, err := cache.New(cfg, cache.Options{})
1001+
Expect(err).NotTo(HaveOccurred())
1002+
1003+
By("indexing the restartPolicy field of the Pod object before starting")
1004+
pod := &kmetav1.PartialObjectMetadata{}
1005+
pod.SetGroupVersionKind(schema.GroupVersionKind{
1006+
Group: "",
1007+
Version: "v1",
1008+
Kind: "Pod",
1009+
})
1010+
indexFunc := func(obj runtime.Object) []string {
1011+
metadata := obj.(*kmetav1.PartialObjectMetadata)
1012+
return []string{metadata.Labels["test-label"]}
1013+
}
1014+
Expect(informer.IndexField(context.TODO(), pod, "metadata.labels.test-label", indexFunc)).To(Succeed())
1015+
1016+
By("running the cache and waiting for it to sync")
1017+
go func() {
1018+
defer GinkgoRecover()
1019+
Expect(informer.Start(stop)).To(Succeed())
1020+
}()
1021+
Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse())
1022+
1023+
By("listing Pods with restartPolicyOnFailure")
1024+
listObj := &kmetav1.PartialObjectMetadataList{}
1025+
listObj.SetGroupVersionKind(schema.GroupVersionKind{
1026+
Group: "",
1027+
Version: "v1",
1028+
Kind: "PodList",
1029+
})
1030+
err = informer.List(context.Background(), listObj,
1031+
client.MatchingFields{"metadata.labels.test-label": "test-pod-3"})
1032+
Expect(err).To(Succeed())
1033+
1034+
By("verifying that the returned pods have correct restart policy")
1035+
Expect(listObj.Items).NotTo(BeEmpty())
1036+
Expect(listObj.Items).Should(HaveLen(1))
1037+
actual := listObj.Items[0]
1038+
Expect(actual.GetName()).To(Equal("test-pod-3"))
1039+
}, 3)
1040+
1041+
It("should allow for get informer to be cancelled", func() {
1042+
By("creating a context and cancelling it")
1043+
ctx, cancel := context.WithCancel(context.Background())
1044+
cancel()
1045+
1046+
By("getting a shared index informer for a pod with a cancelled context")
1047+
pod := &kmetav1.PartialObjectMetadata{}
1048+
pod.SetName("informer-obj2")
1049+
pod.SetNamespace("default")
1050+
pod.SetGroupVersionKind(schema.GroupVersionKind{
1051+
Group: "",
1052+
Version: "v1",
1053+
Kind: "Pod",
1054+
})
1055+
sii, err := informerCache.GetInformer(ctx, pod)
1056+
Expect(err).To(HaveOccurred())
1057+
Expect(sii).To(BeNil())
1058+
Expect(errors.IsTimeout(err)).To(BeTrue())
1059+
})
1060+
})
7441061
})
7451062
})
7461063
}
@@ -762,3 +1079,9 @@ func ensureNamespace(namespace string, client client.Client) error {
7621079
}
7631080
return err
7641081
}
1082+
1083+
//nolint:interfacer
1084+
func isKubeService(svc kmetav1.Object) bool {
1085+
// grumble grumble linters grumble grumble
1086+
return svc.GetNamespace() == "default" && svc.GetName() == "kubernetes"
1087+
}

0 commit comments

Comments
 (0)