Skip to content

Commit 30a89f6

Browse files
committed
Improve UX for using client-go generated Informers
1 parent 4376e6e commit 30a89f6

File tree

5 files changed

+250
-8
lines changed

5 files changed

+250
-8
lines changed

pkg/manager/manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
135135
return r(s)
136136
}
137137

138+
// StartAdapter wraps a Start function to make it implement Runnable
139+
func StartAdapter(s func(<-chan struct{})) Runnable {
140+
return RunnableFunc(func(c <-chan struct{}) error {
141+
s(c)
142+
return nil
143+
})
144+
}
145+
138146
// New returns a new Manager for creating Controllers.
139147
func New(config *rest.Config, options Options) (Manager, error) {
140148
// Initialize a rest.config if none was specified

pkg/source/example_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,22 @@ limitations under the License.
1717
package source_test
1818

1919
import (
20+
"time"
21+
22+
"github.com/golang/glog"
2023
"k8s.io/api/core/v1"
24+
kubeinformers "k8s.io/client-go/informers"
25+
"k8s.io/client-go/kubernetes"
2126
"sigs.k8s.io/controller-runtime/pkg/controller"
2227
"sigs.k8s.io/controller-runtime/pkg/event"
2328
"sigs.k8s.io/controller-runtime/pkg/handler"
29+
"sigs.k8s.io/controller-runtime/pkg/manager"
30+
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
2431
"sigs.k8s.io/controller-runtime/pkg/source"
2532
)
2633

2734
var ctrl controller.Controller
35+
var mgr manager.Manager
2836

2937
// This example Watches for Pod Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request
3038
// with the Name and Namespace of the Pod.
@@ -42,3 +50,27 @@ func ExampleChannel() {
4250
&handler.EnqueueRequestForObject{},
4351
)
4452
}
53+
54+
// This example Watches for Service Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request
55+
// with the Name and Namespace of the Service. It uses the client-go generated Service Informer instead of the
56+
// Generic Informer.
57+
func ExampleInformer() {
58+
generatedClient := kubernetes.NewForConfigOrDie(mgr.GetConfig())
59+
generatedInformers := kubeinformers.NewSharedInformerFactory(generatedClient, time.Minute*30)
60+
61+
// Add it to the Manager
62+
if err := mgr.Add(manager.StartAdapter(generatedInformers.Start)); err != nil {
63+
glog.Fatalf("error Adding InformerFactory to the Manager: %v", err)
64+
}
65+
66+
// Setup Watch using the client-go generated Informer
67+
if err := ctrl.Watch(
68+
&source.Informer{InformerProvider: generatedInformers.Core().V1().Services()},
69+
&handler.EnqueueRequestForObject{},
70+
); err != nil {
71+
glog.Fatalf("error Watching Services: %v", err)
72+
}
73+
74+
// Start the Manager
75+
mgr.Start(signals.SetupSignalHandler())
76+
}

pkg/source/source.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,18 @@ func (cs *Channel) syncLoop() {
241241
}
242242
}
243243

244+
// InformerProvider provides an Informer
245+
type InformerProvider interface {
246+
Informer() toolscache.SharedIndexInformer
247+
}
248+
244249
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
245250
type Informer struct {
246-
// Informer is the generated client-go Informer
251+
// Informer is the generated client-go Informer. Mutually exclusive with InformerProvider.
247252
Informer toolscache.SharedIndexInformer
253+
254+
// InformerProvider provides a generated client-go Informer. Mutually exclusive with Informer.
255+
InformerProvider InformerProvider
248256
}
249257

250258
var _ Source = &Informer{}
@@ -255,11 +263,23 @@ func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimi
255263
prct ...predicate.Predicate) error {
256264

257265
// Informer should have been specified by the user.
258-
if is.Informer == nil {
259-
return fmt.Errorf("must specify Informer.Informer")
266+
if is.Informer == nil && is.InformerProvider == nil {
267+
return fmt.Errorf("must specify Informer.Informer or Informer.InformerProvider")
268+
}
269+
270+
if is.Informer != nil && is.InformerProvider != nil {
271+
return fmt.Errorf("must specify only one of Informer.Informer and Informer.InformerProvider")
272+
}
273+
274+
if is.Informer != nil {
275+
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
276+
}
277+
278+
if is.InformerProvider != nil {
279+
is.InformerProvider.Informer().AddEventHandler(
280+
internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
260281
}
261282

262-
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
263283
return nil
264284
}
265285

pkg/source/source_integration_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,4 +369,186 @@ var _ = Describe("Source", func() {
369369
})
370370
})
371371
})
372+
373+
Describe("InformerProvider", func() {
374+
var c chan struct{}
375+
var rs *appsv1.ReplicaSet
376+
var informerFactory kubeinformers.SharedInformerFactory
377+
var stopTest chan struct{}
378+
379+
BeforeEach(func(done Done) {
380+
stopTest = make(chan struct{})
381+
informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30)
382+
i := informerFactory.Apps().V1().ReplicaSets().Informer()
383+
informerFactory.Start(stopTest)
384+
Eventually(i.HasSynced).Should(BeTrue())
385+
386+
c = make(chan struct{})
387+
rs = &appsv1.ReplicaSet{
388+
ObjectMeta: metav1.ObjectMeta{Name: "informer-rs-name"},
389+
Spec: appsv1.ReplicaSetSpec{
390+
Selector: &metav1.LabelSelector{
391+
MatchLabels: map[string]string{"foo": "bar"},
392+
},
393+
Template: corev1.PodTemplateSpec{
394+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
395+
Spec: corev1.PodSpec{
396+
Containers: []corev1.Container{
397+
{
398+
Name: "nginx",
399+
Image: "nginx",
400+
},
401+
},
402+
},
403+
},
404+
},
405+
}
406+
close(done)
407+
})
408+
409+
AfterEach(func(done Done) {
410+
close(stopTest)
411+
close(done)
412+
})
413+
414+
It("should error if both Informer and InformerProvider are given", func() {
415+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
416+
rsi := informerFactory.Apps().V1().ReplicaSets()
417+
instance := &source.Informer{InformerProvider: rsi, Informer: rsi.Informer()}
418+
err := instance.Start(handler.Funcs{
419+
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {},
420+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {},
421+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {},
422+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {},
423+
}, q)
424+
Expect(err).To(HaveOccurred())
425+
})
426+
427+
It("should error if neither Informer and InformerProvider are given", func() {
428+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
429+
instance := &source.Informer{}
430+
err := instance.Start(handler.Funcs{
431+
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {},
432+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {},
433+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {},
434+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {},
435+
}, q)
436+
Expect(err).To(HaveOccurred())
437+
})
438+
439+
Context("for a ReplicaSet resource", func() {
440+
441+
It("should provide a ReplicaSet CreateEvent", func(done Done) {
442+
c := make(chan struct{})
443+
444+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
445+
instance := &source.Informer{InformerProvider: informerFactory.Apps().V1().ReplicaSets()}
446+
err := instance.Start(handler.Funcs{
447+
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
448+
defer GinkgoRecover()
449+
var err error
450+
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
451+
Expect(err).NotTo(HaveOccurred())
452+
453+
Expect(q2).To(BeIdenticalTo(q))
454+
Expect(evt.Meta).To(Equal(rs))
455+
Expect(evt.Object).To(Equal(rs))
456+
close(c)
457+
},
458+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
459+
defer GinkgoRecover()
460+
Fail("Unexpected UpdateEvent")
461+
},
462+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
463+
defer GinkgoRecover()
464+
Fail("Unexpected DeleteEvent")
465+
},
466+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
467+
defer GinkgoRecover()
468+
Fail("Unexpected GenericEvent")
469+
},
470+
}, q)
471+
Expect(err).NotTo(HaveOccurred())
472+
473+
rs, err = clientset.AppsV1().ReplicaSets("default").Create(rs)
474+
Expect(err).NotTo(HaveOccurred())
475+
<-c
476+
close(done)
477+
}, 30)
478+
479+
It("should provide a ReplicaSet UpdateEvent", func(done Done) {
480+
var err error
481+
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
482+
Expect(err).NotTo(HaveOccurred())
483+
484+
rs2 := rs.DeepCopy()
485+
rs2.SetLabels(map[string]string{"biz": "baz"})
486+
487+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
488+
instance := &source.Informer{InformerProvider: informerFactory.Apps().V1().ReplicaSets()}
489+
err = instance.Start(handler.Funcs{
490+
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
491+
},
492+
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
493+
defer GinkgoRecover()
494+
var err error
495+
rs2, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
496+
Expect(err).NotTo(HaveOccurred())
497+
498+
Expect(q2).To(Equal(q))
499+
Expect(evt.MetaOld).To(Equal(rs))
500+
Expect(evt.ObjectOld).To(Equal(rs))
501+
502+
Expect(evt.MetaNew).To(Equal(rs2))
503+
Expect(evt.ObjectNew).To(Equal(rs2))
504+
505+
close(c)
506+
},
507+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
508+
defer GinkgoRecover()
509+
Fail("Unexpected DeleteEvent")
510+
},
511+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
512+
defer GinkgoRecover()
513+
Fail("Unexpected GenericEvent")
514+
},
515+
}, q)
516+
Expect(err).NotTo(HaveOccurred())
517+
518+
rs2, err = clientset.AppsV1().ReplicaSets("default").Update(rs2)
519+
Expect(err).NotTo(HaveOccurred())
520+
<-c
521+
close(done)
522+
})
523+
524+
It("should provide a ReplicaSet DeletedEvent", func(done Done) {
525+
c := make(chan struct{})
526+
527+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
528+
instance := &source.Informer{InformerProvider: informerFactory.Apps().V1().ReplicaSets()}
529+
err := instance.Start(handler.Funcs{
530+
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
531+
},
532+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
533+
},
534+
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
535+
defer GinkgoRecover()
536+
Expect(q2).To(Equal(q))
537+
Expect(evt.Meta.GetName()).To(Equal(rs.Name))
538+
close(c)
539+
},
540+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
541+
defer GinkgoRecover()
542+
Fail("Unexpected GenericEvent")
543+
},
544+
}, q)
545+
Expect(err).NotTo(HaveOccurred())
546+
547+
err = clientset.AppsV1().ReplicaSets("default").Delete(rs.Name, &metav1.DeleteOptions{})
548+
Expect(err).NotTo(HaveOccurred())
549+
<-c
550+
close(done)
551+
})
552+
})
553+
})
372554
})

pkg/source/source_suite_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestSource(t *testing.T) {
3434
}
3535

3636
var testenv *envtest.Environment
37-
var config *rest.Config
37+
var cfg *rest.Config
3838
var clientset *kubernetes.Clientset
3939
var icache cache.Cache
4040
var stop chan struct{}
@@ -46,13 +46,13 @@ var _ = BeforeSuite(func(done Done) {
4646
testenv = &envtest.Environment{}
4747

4848
var err error
49-
config, err = testenv.Start()
49+
cfg, err = testenv.Start()
5050
Expect(err).NotTo(HaveOccurred())
5151

52-
clientset, err = kubernetes.NewForConfig(config)
52+
clientset, err = kubernetes.NewForConfig(cfg)
5353
Expect(err).NotTo(HaveOccurred())
5454

55-
icache, err = cache.New(config, cache.Options{})
55+
icache, err = cache.New(cfg, cache.Options{})
5656
Expect(err).NotTo(HaveOccurred())
5757

5858
go func() {

0 commit comments

Comments
 (0)