Skip to content

Commit 62aa08d

Browse files
added leader election for runnables
1 parent 056a18a commit 62aa08d

File tree

7 files changed

+175
-41
lines changed

7 files changed

+175
-41
lines changed

pkg/controller/controller.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ type Options struct {
3535

3636
// Reconciler reconciles an object
3737
Reconciler reconcile.Reconciler
38+
39+
// LeaderElection determines whether or not to use leader election when starting the controller.
40+
// Defaults to true
41+
LeaderElection bool
42+
43+
// LeaderElectionID determines the name of the configmap that leader election will use for holding the leader lock.
44+
LeaderElectionID string
3845
}
3946

4047
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -89,6 +96,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8996
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
9097
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
9198
Name: name,
99+
LeaderElection: options.LeaderElection,
100+
LeaderElectionID: options.LeaderElectionID,
92101
}
93102

94103
// Add the controller as a Manager components

pkg/internal/controller/controller.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ type Controller struct {
5050
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
5151
MaxConcurrentReconciles int
5252

53+
// LeaderElection determines whether or not to use leader election when
54+
// starting the controller.
55+
LeaderElection bool
56+
57+
// LeaderElectionID determines the name of the configmap that leader election
58+
// will use for holding the leader lock.
59+
LeaderElectionID string
60+
5361
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
5462
// ensures that the state of the system matches the state specified in the object.
5563
// Defaults to the DefaultReconcileFunc.
@@ -256,3 +264,11 @@ func (c *Controller) InjectFunc(f inject.Func) error {
256264
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
257265
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
258266
}
267+
268+
func (c *Controller) NeedLeaderElection() bool {
269+
return c.LeaderElection
270+
}
271+
272+
func (c *Controller) GetID() string {
273+
return c.LeaderElectionID
274+
}

pkg/leaderelection/leader_election.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op
5252
return nil, nil
5353
}
5454

55-
// Default the LeaderElectionID
56-
if options.LeaderElectionID == "" {
57-
options.LeaderElectionID = "controller-leader-election-helper"
58-
}
59-
6055
// Default the namespace (if running in cluster)
6156
if options.LeaderElectionNamespace == "" {
6257
var err error

pkg/manager/internal.go

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package manager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net"
2324
"net/http"
@@ -34,6 +35,7 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/cache"
3536
"sigs.k8s.io/controller-runtime/pkg/client"
3637
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
38+
crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection"
3739
"sigs.k8s.io/controller-runtime/pkg/metrics"
3840
"sigs.k8s.io/controller-runtime/pkg/recorder"
3941
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@@ -82,8 +84,16 @@ type controllerManager struct {
8284
// (and EventHandlers, Sources and Predicates).
8385
recorderProvider recorder.Provider
8486

85-
// resourceLock forms the basis for leader election
86-
resourceLock resourcelock.Interface
87+
// leaderElectionNamespace determines the namespace in which the leader
88+
// election configmap will be created.
89+
leaderElectionNamespace string
90+
91+
// leaderElectionID determines the name of the configmap that leader election
92+
// will use for holding the leader lock.
93+
leaderElectionID string
94+
95+
// resourceLocksMap maps leader election ID to resource lock
96+
resourceLocksMap map[string]resourcelock.Interface
8797

8898
// mapper is used to map resources to kind, and map kind and version.
8999
mapper meta.RESTMapper
@@ -123,6 +133,9 @@ type controllerManager struct {
123133
// retryPeriod is the duration the LeaderElector clients should wait
124134
// between tries of actions.
125135
retryPeriod time.Duration
136+
137+
// Dependency injection for testing
138+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options crleaderelection.Options) (resourcelock.Interface, error)
126139
}
127140

128141
// Add sets dependencies on i, and adds it to the list of Runnables to start.
@@ -265,8 +278,18 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
265278

266279
go cm.startNonLeaderElectionRunnables()
267280

268-
if cm.resourceLock != nil {
269-
err := cm.startLeaderElection()
281+
if resourceLock := cm.resourceLocksMap[cm.leaderElectionID]; resourceLock != nil {
282+
err := cm.startLeaderElection(cm.leaderElectionID, leaderelection.LeaderCallbacks{
283+
OnStartedLeading: func(_ context.Context) {
284+
cm.startLeaderElectionRunnables()
285+
},
286+
OnStoppedLeading: func() {
287+
// Most implementations of leader election log.Fatal() here.
288+
// Since Start is wrapped in log.Fatal when called, we can just return
289+
// an error here which will cause the program to exit.
290+
cm.errChan <- fmt.Errorf("leader election lost")
291+
},
292+
})
270293
if err != nil {
271294
return err
272295
}
@@ -313,7 +336,46 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
313336
// Write any Start errors to a channel so we can return them
314337
ctrl := c
315338
go func() {
316-
cm.errChan <- ctrl.Start(cm.internalStop)
339+
if leRunnable, ok := ctrl.(LeaderElectionRunnable); ok {
340+
leID := leRunnable.GetID()
341+
342+
// Check that leader election ID is defined
343+
if leID == "" {
344+
cm.errChan <- errors.New("LeaderElectionID must be configured")
345+
return
346+
}
347+
348+
// Check that leader election ID is unique
349+
if _, exists := cm.resourceLocksMap[leID]; exists {
350+
cm.errChan <- errors.New("LeaderElectionID must be unique")
351+
return
352+
}
353+
354+
// Create resource lock
355+
resourceLock, err := cm.newResourceLock(cm.config, cm.recorderProvider, crleaderelection.Options{
356+
LeaderElection: true,
357+
LeaderElectionID: leID,
358+
LeaderElectionNamespace: cm.leaderElectionNamespace,
359+
})
360+
if err != nil {
361+
cm.errChan <- err
362+
}
363+
364+
cm.resourceLocksMap[leID] = resourceLock
365+
err = cm.startLeaderElection(leID, leaderelection.LeaderCallbacks{
366+
OnStartedLeading: func(_ context.Context) {
367+
cm.errChan <- ctrl.Start(cm.internalStop)
368+
},
369+
OnStoppedLeading: func() {
370+
cm.errChan <- fmt.Errorf("runnable leader election lost")
371+
},
372+
})
373+
if err != nil {
374+
cm.errChan <- err
375+
}
376+
} else {
377+
cm.errChan <- ctrl.Start(cm.internalStop)
378+
}
317379
}()
318380
}
319381
}
@@ -339,23 +401,13 @@ func (cm *controllerManager) waitForCache() {
339401
cm.started = true
340402
}
341403

342-
func (cm *controllerManager) startLeaderElection() (err error) {
404+
func (cm *controllerManager) startLeaderElection(leaderElectionID string, callbacks leaderelection.LeaderCallbacks) (err error) {
343405
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
344-
Lock: cm.resourceLock,
406+
Lock: cm.resourceLocksMap[leaderElectionID],
345407
LeaseDuration: cm.leaseDuration,
346408
RenewDeadline: cm.renewDeadline,
347409
RetryPeriod: cm.retryPeriod,
348-
Callbacks: leaderelection.LeaderCallbacks{
349-
OnStartedLeading: func(_ context.Context) {
350-
cm.startLeaderElectionRunnables()
351-
},
352-
OnStoppedLeading: func() {
353-
// Most implementations of leader election log.Fatal() here.
354-
// Since Start is wrapped in log.Fatal when called, we can just return
355-
// an error here which will cause the program to exit.
356-
cm.errChan <- fmt.Errorf("leader election lost")
357-
},
358-
},
410+
Callbacks: callbacks,
359411
})
360412
if err != nil {
361413
return err

pkg/manager/manager.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ type LeaderElectionRunnable interface {
190190
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
191191
// e.g. controllers need to be run in leader election mode, while webhook server doesn't.
192192
NeedLeaderElection() bool
193+
194+
// GetID returns leader election ID
195+
GetID() string
193196
}
194197

195198
// New returns a new Manager for creating Controllers.
@@ -241,6 +244,9 @@ func New(config *rest.Config, options Options) (Manager, error) {
241244
if err != nil {
242245
return nil, err
243246
}
247+
resourceLockMap := map[string]resourcelock.Interface{
248+
options.LeaderElectionID: resourceLock,
249+
}
244250

245251
// Create the mertics listener. This will throw an error if the metrics bind
246252
// address is invalid or already in use.
@@ -252,24 +258,27 @@ func New(config *rest.Config, options Options) (Manager, error) {
252258
stop := make(chan struct{})
253259

254260
return &controllerManager{
255-
config: config,
256-
scheme: options.Scheme,
257-
errChan: make(chan error),
258-
cache: cache,
259-
fieldIndexes: cache,
260-
client: writeObj,
261-
apiReader: apiReader,
262-
recorderProvider: recorderProvider,
263-
resourceLock: resourceLock,
264-
mapper: mapper,
265-
metricsListener: metricsListener,
266-
internalStop: stop,
267-
internalStopper: stop,
268-
port: options.Port,
269-
host: options.Host,
270-
leaseDuration: *options.LeaseDuration,
271-
renewDeadline: *options.RenewDeadline,
272-
retryPeriod: *options.RetryPeriod,
261+
config: config,
262+
scheme: options.Scheme,
263+
errChan: make(chan error),
264+
cache: cache,
265+
fieldIndexes: cache,
266+
client: writeObj,
267+
apiReader: apiReader,
268+
recorderProvider: recorderProvider,
269+
resourceLocksMap: resourceLockMap,
270+
mapper: mapper,
271+
metricsListener: metricsListener,
272+
internalStop: stop,
273+
internalStopper: stop,
274+
port: options.Port,
275+
host: options.Host,
276+
leaderElectionNamespace: options.LeaderElectionNamespace,
277+
leaderElectionID: options.LeaderElectionID,
278+
leaseDuration: *options.LeaseDuration,
279+
renewDeadline: *options.RenewDeadline,
280+
retryPeriod: *options.RetryPeriod,
281+
newResourceLock: options.newResourceLock,
273282
}, nil
274283
}
275284

@@ -317,6 +326,11 @@ func setOptionsDefaults(options Options) Options {
317326
options.newRecorderProvider = internalrecorder.NewProvider
318327
}
319328

329+
// Default the LeaderElectionID
330+
if options.LeaderElectionID == "" {
331+
options.LeaderElectionID = "controller-leader-election-helper"
332+
}
333+
320334
// Allow newResourceLock to be mocked
321335
if options.newResourceLock == nil {
322336
options.newResourceLock = leaderelection.NewResourceLock

pkg/manager/manager_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,34 @@ var _ = Describe("manger.Manager", func() {
285285
<-c3
286286
})
287287

288+
It("shouldn't allow empty LeaderElectionID for controller", func(done Done) {
289+
m, err := New(cfg, options)
290+
Expect(err).ToNot(HaveOccurred())
291+
292+
r := &leRunnable{
293+
leaderElectionID: "",
294+
}
295+
Expect(m.Add(r)).To(Succeed())
296+
Expect(m.Start(stop).Error()).To(ContainSubstring("LeaderElectionID must be configured"))
297+
298+
close(done)
299+
})
300+
301+
It("shouldn't allow non unique LeaderElectionID for controller", func(done Done) {
302+
if options.LeaderElectionID != "" {
303+
m, err := New(cfg, options)
304+
Expect(err).ToNot(HaveOccurred())
305+
306+
r := &leRunnable{
307+
leaderElectionID: options.LeaderElectionID,
308+
}
309+
Expect(m.Add(r)).To(Succeed())
310+
Expect(m.Start(stop).Error()).To(ContainSubstring("LeaderElectionID must be unique"))
311+
}
312+
313+
close(done)
314+
})
315+
288316
It("should return an error if any non-leaderelection Components fail to Start", func() {
289317
// TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429
290318
})
@@ -711,3 +739,19 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error {
711739
func (i *injectable) Start(<-chan struct{}) error {
712740
return nil
713741
}
742+
743+
type leRunnable struct {
744+
leaderElectionID string
745+
}
746+
747+
func (*leRunnable) Start(<-chan struct{}) error {
748+
return nil
749+
}
750+
751+
func (*leRunnable) NeedLeaderElection() bool {
752+
return true
753+
}
754+
755+
func (le *leRunnable) GetID() string {
756+
return le.leaderElectionID
757+
}

pkg/webhook/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ func (*Server) NeedLeaderElection() bool {
9595
return false
9696
}
9797

98+
func (*Server) GetID() string {
99+
return ""
100+
}
101+
98102
// Register marks the given webhook as being served at the given path.
99103
// It panics if two hooks are registered on the same path.
100104
func (s *Server) Register(path string, hook http.Handler) {

0 commit comments

Comments
 (0)