Skip to content

Improve UX for using client-go generated Informers #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
return r(s)
}

// StartAdapter wraps a Start function to make it implement Runnable
func StartAdapter(s func(<-chan struct{})) Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To a new user it will read as ---> This starts some sort of an adapter. Other names that comes to mind are: MakeRunnable, StartFnAdapter.

return RunnableFunc(func(c <-chan struct{}) error {
s(c)
return nil
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like manager pkg to export just the essentials functions/methods. Functions like these, which are syntax sugar, should belong to some sister utils pkg like we controllerutils for controller helpers.

Backward compatibility contracts with core methods/functions are going to be more stricter than these utils/helpers pkgs. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM


// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Initialize a rest.config if none was specified
Expand Down
32 changes: 32 additions & 0 deletions pkg/source/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@ limitations under the License.
package source_test

import (
"time"

"github.com/golang/glog"
"k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var ctrl controller.Controller
var mgr manager.Manager

// This example Watches for Pod Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request
// with the Name and Namespace of the Pod.
Expand All @@ -42,3 +50,27 @@ func ExampleChannel() {
&handler.EnqueueRequestForObject{},
)
}

// This example Watches for Service Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request
// with the Name and Namespace of the Service. It uses the client-go generated Service Informer instead of the
// Generic Informer.
func ExampleInformer() {
generatedClient := kubernetes.NewForConfigOrDie(mgr.GetConfig())
generatedInformers := kubeinformers.NewSharedInformerFactory(generatedClient, time.Minute*30)

// Add it to the Manager
if err := mgr.Add(manager.StartAdapter(generatedInformers.Start)); err != nil {
glog.Fatalf("error Adding InformerFactory to the Manager: %v", err)
}

// Setup Watch using the client-go generated Informer
if err := ctrl.Watch(
&source.Informer{InformerProvider: generatedInformers.Core().V1().Services()},
&handler.EnqueueRequestForObject{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am reading correctly, there is an order requirement here. Informer source should only start after the informer Runnable has started and sync'ed ? Not sure how that is enforced. Will look into the code. I may be wrong also about the ordering requirement.

); err != nil {
glog.Fatalf("error Watching Services: %v", err)
}

// Start the Manager
mgr.Start(signals.SetupSignalHandler())
}
28 changes: 24 additions & 4 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,18 @@ func (cs *Channel) syncLoop() {
}
}

// InformerProvider provides an Informer
type InformerProvider interface {
Informer() toolscache.SharedIndexInformer
}

// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
type Informer struct {
// Informer is the generated client-go Informer
// Informer is the generated client-go Informer. Mutually exclusive with InformerProvider.
Informer toolscache.SharedIndexInformer

// InformerProvider provides a generated client-go Informer. Mutually exclusive with Informer.
InformerProvider InformerProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Informer term is being overused and source.Informer and client-go.Informer are getting mixed up in my head and causing confusion. Don't have a better suggestions.

}

var _ Source = &Informer{}
Expand All @@ -255,11 +263,23 @@ func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimi
prct ...predicate.Predicate) error {

// Informer should have been specified by the user.
if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
if is.Informer == nil && is.InformerProvider == nil {
return fmt.Errorf("must specify Informer.Informer or Informer.InformerProvider")
}

if is.Informer != nil && is.InformerProvider != nil {
return fmt.Errorf("must specify only one of Informer.Informer and Informer.InformerProvider")
}

if is.Informer != nil {
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
}

if is.InformerProvider != nil {
is.InformerProvider.Informer().AddEventHandler(
internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
}

is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}

Expand Down
182 changes: 182 additions & 0 deletions pkg/source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,186 @@ var _ = Describe("Source", func() {
})
})
})

Describe("InformerProvider", func() {
var c chan struct{}
var rs *appsv1.ReplicaSet
var informerFactory kubeinformers.SharedInformerFactory
var stopTest chan struct{}

BeforeEach(func(done Done) {
stopTest = make(chan struct{})
informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30)
i := informerFactory.Apps().V1().ReplicaSets().Informer()
informerFactory.Start(stopTest)
Eventually(i.HasSynced).Should(BeTrue())

c = make(chan struct{})
rs = &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{Name: "informer-rs-name"},
Spec: appsv1.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
close(done)
})

AfterEach(func(done Done) {
close(stopTest)
close(done)
})

It("should error if both Informer and InformerProvider are given", func() {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
rsi := informerFactory.Apps().V1().ReplicaSets()
instance := &source.Informer{InformerProvider: rsi, Informer: rsi.Informer()}
err := instance.Start(handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {},
}, q)
Expect(err).To(HaveOccurred())
})

It("should error if neither Informer and InformerProvider are given", func() {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{}
err := instance.Start(handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {},
}, q)
Expect(err).To(HaveOccurred())
})

Context("for a ReplicaSet resource", func() {

It("should provide a ReplicaSet CreateEvent", func(done Done) {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{InformerProvider: informerFactory.Apps().V1().ReplicaSets()}
err := instance.Start(handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
var err error
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

Expect(q2).To(BeIdenticalTo(q))
Expect(evt.Meta).To(Equal(rs))
Expect(evt.Object).To(Equal(rs))
close(c)
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected UpdateEvent")
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected GenericEvent")
},
}, q)
Expect(err).NotTo(HaveOccurred())

rs, err = clientset.AppsV1().ReplicaSets("default").Create(rs)
Expect(err).NotTo(HaveOccurred())
<-c
close(done)
}, 30)

It("should provide a ReplicaSet UpdateEvent", func(done Done) {
var err error
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

rs2 := rs.DeepCopy()
rs2.SetLabels(map[string]string{"biz": "baz"})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{InformerProvider: informerFactory.Apps().V1().ReplicaSets()}
err = instance.Start(handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
},
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
var err error
rs2, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

Expect(q2).To(Equal(q))
Expect(evt.MetaOld).To(Equal(rs))
Expect(evt.ObjectOld).To(Equal(rs))

Expect(evt.MetaNew).To(Equal(rs2))
Expect(evt.ObjectNew).To(Equal(rs2))

close(c)
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected GenericEvent")
},
}, q)
Expect(err).NotTo(HaveOccurred())

rs2, err = clientset.AppsV1().ReplicaSets("default").Update(rs2)
Expect(err).NotTo(HaveOccurred())
<-c
close(done)
})

It("should provide a ReplicaSet DeletedEvent", func(done Done) {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{InformerProvider: informerFactory.Apps().V1().ReplicaSets()}
err := instance.Start(handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
},
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expect(q2).To(Equal(q))
Expect(evt.Meta.GetName()).To(Equal(rs.Name))
close(c)
},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected GenericEvent")
},
}, q)
Expect(err).NotTo(HaveOccurred())

err = clientset.AppsV1().ReplicaSets("default").Delete(rs.Name, &metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
<-c
close(done)
})
})
})
})
8 changes: 4 additions & 4 deletions pkg/source/source_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSource(t *testing.T) {
}

var testenv *envtest.Environment
var config *rest.Config
var cfg *rest.Config
var clientset *kubernetes.Clientset
var icache cache.Cache
var stop chan struct{}
Expand All @@ -46,13 +46,13 @@ var _ = BeforeSuite(func(done Done) {
testenv = &envtest.Environment{}

var err error
config, err = testenv.Start()
cfg, err = testenv.Start()
Expect(err).NotTo(HaveOccurred())

clientset, err = kubernetes.NewForConfig(config)
clientset, err = kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())

icache, err = cache.New(config, cache.Options{})
icache, err = cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down