Skip to content

✨Allow controllers to be started and stopped separately from the manager #863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ type Controller interface {
// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}

// Add the controller as a Manager components
return c, mgr.Add(c)
}

// NewUnmanaged returns a new controller without adding it to the manager. The
// caller is responsible for starting the returned controller.
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
Expand Down Expand Up @@ -100,9 +112,9 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
SetFields: mgr.SetFields,
Name: name,
}

// Add the controller as a Manager components
return c, mgr.Add(c)
return c, nil
}
45 changes: 45 additions & 0 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,48 @@ func ExampleController_unstructured() {
os.Exit(1)
}
}

// This example creates a new controller named "pod-controller" to watch Pods
// and call a no-op reconciler. The controller is not added to the provided
// manager, and must thus be started and stopped by the caller.
func ExampleNewUnmanaged() {
// mgr is a manager.Manager

// Configure creates a new controller but does not add it to the supplied
// manager.
c, err := controller.NewUnmanaged("pod-controller", mgr, controller.Options{
Reconciler: reconcile.Func(func(_ reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}),
})
if err != nil {
log.Error(err, "unable to create pod-controller")
os.Exit(1)
}

if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}

// Create a stop channel for our controller. The controller will stop when
// this channel is closed.
stop := make(chan struct{})

// Start our controller in a goroutine so that we do not block.
go func() {
// Block until our controller manager is elected leader. We presume our
// entire process will terminate if we lose leadership, so we don't need
// to handle that.
<-mgr.Elected()

// Start our controller. This will block until the stop channel is
// closed, or the controller returns an error.
if err := c.Start(stop); err != nil {
log.Error(err, "cannot run experiment controller")
}
}()

// Stop our controller.
close(stop)
}
12 changes: 12 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ type controllerManager struct {
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

// elected is closed when this manager becomes the leader of a group of
// managers, either because it won a leader election or because no leader
// election was configured.
elected chan struct{}

startCache func(stop <-chan struct{}) error

// port is the port that the webhook server serves at.
Expand Down Expand Up @@ -423,6 +428,8 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
return err
}
} else {
// Treat not having leader election enabled the same as being elected.
close(cm.elected)
go cm.startLeaderElectionRunnables()
}

Expand Down Expand Up @@ -511,6 +518,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
close(cm.elected)
cm.startLeaderElectionRunnables()
},
OnStoppedLeading: func() {
Expand Down Expand Up @@ -538,3 +546,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
go l.Run(ctx)
return nil
}

func (cm *controllerManager) Elected() <-chan struct{} {
return cm.elected
}
6 changes: 6 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type Manager interface {
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
Add(Runnable) error

// Elected is closed when this manager is elected leader of a group of
// managers, either because it won a leader election or because no leader
// election was configured.
Elected() <-chan struct{}

// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
SetFields(interface{}) error
Expand Down Expand Up @@ -313,6 +318,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
Expand Down
57 changes: 55 additions & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ var _ = Describe("manger.Manager", func() {
Context("with leader election enabled", func() {
It("should default ID to controller-runtime if ID is not set", func() {
var rl resourcelock.Interface
m, err := New(cfg, Options{
m1, err := New(cfg, Options{
LeaderElection: true,
LeaderElectionNamespace: "default",
LeaderElectionID: "test-leader-election-id",
Expand All @@ -152,10 +152,61 @@ var _ = Describe("manger.Manager", func() {
rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
return rl, err
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
Expect(m).ToNot(BeNil())
Expect(m1).ToNot(BeNil())
Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))

m2, err := New(cfg, Options{
LeaderElection: true,
LeaderElectionNamespace: "default",
LeaderElectionID: "test-leader-election-id",
newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
var err error
rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
return rl, err
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
})

Expect(err).ToNot(HaveOccurred())
Expect(m2).ToNot(BeNil())
Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))

c1 := make(chan struct{})
Expect(m1.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
close(c1)
return nil
}))).To(Succeed())

go func() {
defer GinkgoRecover()
Expect(m1.Elected()).ShouldNot(BeClosed())
Expect(m1.Start(stop)).NotTo(HaveOccurred())
Expect(m1.Elected()).Should(BeClosed())
}()
<-c1

c2 := make(chan struct{})
Expect(m2.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
close(c2)
return nil
}))).To(Succeed())

By("Expect second manager to lose leader election")
go func() {
defer GinkgoRecover()
Expect(m2.Start(stop)).NotTo(HaveOccurred())
Consistently(m2.Elected()).ShouldNot(Receive())
}()

By("Expect controller on manager without leader lease never to run")
Consistently(c2).ShouldNot(Receive())
})

It("should return an error if namespace not set and not running in cluster", func() {
Expand Down Expand Up @@ -260,7 +311,9 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(stop)).NotTo(HaveOccurred())
Expect(m.Elected()).Should(BeClosed())
}()
<-c1
<-c2
Expand Down