Skip to content

Commit 3222f6f

Browse files
added leader election for runnables
1 parent 056a18a commit 3222f6f

File tree

6 files changed

+179
-41
lines changed

6 files changed

+179
-41
lines changed

pkg/controller/controller.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ type Options struct {
3535

3636
// Reconciler reconciles an object
3737
Reconciler reconcile.Reconciler
38+
39+
// LeaderElection determines whether or not to use leader election when
40+
// starting the controller.
41+
LeaderElection bool
42+
43+
// LeaderElectionID determines the name of the configmap that leader election
44+
// will use for holding the leader lock.
45+
LeaderElectionID string
3846
}
3947

4048
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -89,6 +97,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8997
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
9098
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
9199
Name: name,
100+
LeaderElection: options.LeaderElection,
101+
LeaderElectionID: options.LeaderElectionID,
92102
}
93103

94104
// Add the controller as a Manager components

pkg/internal/controller/controller.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ type Controller struct {
5050
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
5151
MaxConcurrentReconciles int
5252

53+
// LeaderElection determines whether or not to use leader election when
54+
// starting the controller.
55+
LeaderElection bool
56+
57+
// LeaderElectionID determines the name of the configmap that leader election
58+
// will use for holding the leader lock.
59+
LeaderElectionID string
60+
5361
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
5462
// ensures that the state of the system matches the state specified in the object.
5563
// Defaults to the DefaultReconcileFunc.
@@ -256,3 +264,11 @@ func (c *Controller) InjectFunc(f inject.Func) error {
256264
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
257265
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
258266
}
267+
268+
func (c *Controller) GetLeaderElection() bool {
269+
return c.LeaderElection
270+
}
271+
272+
func (c *Controller) GetLeaderElectionID() string {
273+
return c.LeaderElectionID
274+
}

pkg/leaderelection/leader_election.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op
5252
return nil, nil
5353
}
5454

55-
// Default the LeaderElectionID
56-
if options.LeaderElectionID == "" {
57-
options.LeaderElectionID = "controller-leader-election-helper"
58-
}
59-
6055
// Default the namespace (if running in cluster)
6156
if options.LeaderElectionNamespace == "" {
6257
var err error

pkg/manager/internal.go

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package manager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net"
2324
"net/http"
@@ -34,6 +35,7 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/cache"
3536
"sigs.k8s.io/controller-runtime/pkg/client"
3637
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
38+
crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection"
3739
"sigs.k8s.io/controller-runtime/pkg/metrics"
3840
"sigs.k8s.io/controller-runtime/pkg/recorder"
3941
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@@ -82,8 +84,16 @@ type controllerManager struct {
8284
// (and EventHandlers, Sources and Predicates).
8385
recorderProvider recorder.Provider
8486

85-
// resourceLock forms the basis for leader election
86-
resourceLock resourcelock.Interface
87+
// leaderElectionNamespace determines the namespace in which the leader
88+
// election configmap will be created.
89+
leaderElectionNamespace string
90+
91+
// leaderElectionID determines the name of the configmap that leader election
92+
// will use for holding the leader lock.
93+
leaderElectionID string
94+
95+
// resourceLocksMap maps leader election ID to resource lock
96+
resourceLocksMap map[string]resourcelock.Interface
8797

8898
// mapper is used to map resources to kind, and map kind and version.
8999
mapper meta.RESTMapper
@@ -123,6 +133,9 @@ type controllerManager struct {
123133
// retryPeriod is the duration the LeaderElector clients should wait
124134
// between tries of actions.
125135
retryPeriod time.Duration
136+
137+
// Dependency injection for testing
138+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options crleaderelection.Options) (resourcelock.Interface, error)
126139
}
127140

128141
// Add sets dependencies on i, and adds it to the list of Runnables to start.
@@ -265,8 +278,18 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
265278

266279
go cm.startNonLeaderElectionRunnables()
267280

268-
if cm.resourceLock != nil {
269-
err := cm.startLeaderElection()
281+
if resourceLock := cm.resourceLocksMap[cm.leaderElectionID]; resourceLock != nil {
282+
err := cm.startLeaderElection(cm.leaderElectionID, leaderelection.LeaderCallbacks{
283+
OnStartedLeading: func(_ context.Context) {
284+
cm.startLeaderElectionRunnables()
285+
},
286+
OnStoppedLeading: func() {
287+
// Most implementations of leader election log.Fatal() here.
288+
// Since Start is wrapped in log.Fatal when called, we can just return
289+
// an error here which will cause the program to exit.
290+
cm.errChan <- fmt.Errorf("leader election lost")
291+
},
292+
})
270293
if err != nil {
271294
return err
272295
}
@@ -313,7 +336,46 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
313336
// Write any Start errors to a channel so we can return them
314337
ctrl := c
315338
go func() {
316-
cm.errChan <- ctrl.Start(cm.internalStop)
339+
if sr, ok := ctrl.(SingletonRunnable); ok && sr.GetLeaderElection() {
340+
runnableLeaderElectionID := ctrl.(SingletonRunnable).GetLeaderElectionID()
341+
342+
// Check that leader election ID is defined
343+
if runnableLeaderElectionID == "" {
344+
cm.errChan <- errors.New("LeaderElectionID must be configured")
345+
return
346+
}
347+
348+
// Check that leader election ID is unique
349+
if _, exists := cm.resourceLocksMap[runnableLeaderElectionID]; exists {
350+
cm.errChan <- errors.New("LeaderElectionID must be unique")
351+
return
352+
}
353+
354+
// Create resource lock
355+
resourceLock, err := cm.newResourceLock(cm.config, cm.recorderProvider, crleaderelection.Options{
356+
LeaderElection: true,
357+
LeaderElectionID: runnableLeaderElectionID,
358+
LeaderElectionNamespace: cm.leaderElectionNamespace,
359+
})
360+
if err != nil {
361+
cm.errChan <- err
362+
}
363+
364+
cm.resourceLocksMap[runnableLeaderElectionID] = resourceLock
365+
err = cm.startLeaderElection(runnableLeaderElectionID, leaderelection.LeaderCallbacks{
366+
OnStartedLeading: func(_ context.Context) {
367+
cm.errChan <- ctrl.Start(cm.internalStop)
368+
},
369+
OnStoppedLeading: func() {
370+
cm.errChan <- fmt.Errorf("runnable leader election lost")
371+
},
372+
})
373+
if err != nil {
374+
cm.errChan <- err
375+
}
376+
} else {
377+
cm.errChan <- ctrl.Start(cm.internalStop)
378+
}
317379
}()
318380
}
319381
}
@@ -339,23 +401,13 @@ func (cm *controllerManager) waitForCache() {
339401
cm.started = true
340402
}
341403

342-
func (cm *controllerManager) startLeaderElection() (err error) {
404+
func (cm *controllerManager) startLeaderElection(leaderElectionID string, callbacks leaderelection.LeaderCallbacks) (err error) {
343405
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
344-
Lock: cm.resourceLock,
406+
Lock: cm.resourceLocksMap[leaderElectionID],
345407
LeaseDuration: cm.leaseDuration,
346408
RenewDeadline: cm.renewDeadline,
347409
RetryPeriod: cm.retryPeriod,
348-
Callbacks: leaderelection.LeaderCallbacks{
349-
OnStartedLeading: func(_ context.Context) {
350-
cm.startLeaderElectionRunnables()
351-
},
352-
OnStoppedLeading: func() {
353-
// Most implementations of leader election log.Fatal() here.
354-
// Since Start is wrapped in log.Fatal when called, we can just return
355-
// an error here which will cause the program to exit.
356-
cm.errChan <- fmt.Errorf("leader election lost")
357-
},
358-
},
410+
Callbacks: callbacks,
359411
})
360412
if err != nil {
361413
return err

pkg/manager/manager.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ type LeaderElectionRunnable interface {
192192
NeedLeaderElection() bool
193193
}
194194

195+
// SingletonRunnable knows if Runnable needs leader election.
196+
// Runnable also have to be leader election Runnable(started after manager's leader election)
197+
type SingletonRunnable interface {
198+
// GetLeaderElection returns true if need to perform leader election
199+
GetLeaderElection() bool
200+
201+
// GetLeaderElectionID returns leader election ID
202+
GetLeaderElectionID() string
203+
}
204+
195205
// New returns a new Manager for creating Controllers.
196206
func New(config *rest.Config, options Options) (Manager, error) {
197207
// Initialize a rest.config if none was specified
@@ -241,6 +251,9 @@ func New(config *rest.Config, options Options) (Manager, error) {
241251
if err != nil {
242252
return nil, err
243253
}
254+
resourceLockMap := map[string]resourcelock.Interface{
255+
options.LeaderElectionID: resourceLock,
256+
}
244257

245258
// Create the mertics listener. This will throw an error if the metrics bind
246259
// address is invalid or already in use.
@@ -252,24 +265,27 @@ func New(config *rest.Config, options Options) (Manager, error) {
252265
stop := make(chan struct{})
253266

254267
return &controllerManager{
255-
config: config,
256-
scheme: options.Scheme,
257-
errChan: make(chan error),
258-
cache: cache,
259-
fieldIndexes: cache,
260-
client: writeObj,
261-
apiReader: apiReader,
262-
recorderProvider: recorderProvider,
263-
resourceLock: resourceLock,
264-
mapper: mapper,
265-
metricsListener: metricsListener,
266-
internalStop: stop,
267-
internalStopper: stop,
268-
port: options.Port,
269-
host: options.Host,
270-
leaseDuration: *options.LeaseDuration,
271-
renewDeadline: *options.RenewDeadline,
272-
retryPeriod: *options.RetryPeriod,
268+
config: config,
269+
scheme: options.Scheme,
270+
errChan: make(chan error),
271+
cache: cache,
272+
fieldIndexes: cache,
273+
client: writeObj,
274+
apiReader: apiReader,
275+
recorderProvider: recorderProvider,
276+
resourceLocksMap: resourceLockMap,
277+
mapper: mapper,
278+
metricsListener: metricsListener,
279+
internalStop: stop,
280+
internalStopper: stop,
281+
port: options.Port,
282+
host: options.Host,
283+
leaderElectionNamespace: options.LeaderElectionNamespace,
284+
leaderElectionID: options.LeaderElectionID,
285+
leaseDuration: *options.LeaseDuration,
286+
renewDeadline: *options.RenewDeadline,
287+
retryPeriod: *options.RetryPeriod,
288+
newResourceLock: options.newResourceLock,
273289
}, nil
274290
}
275291

@@ -317,6 +333,11 @@ func setOptionsDefaults(options Options) Options {
317333
options.newRecorderProvider = internalrecorder.NewProvider
318334
}
319335

336+
// Default the LeaderElectionID
337+
if options.LeaderElectionID == "" {
338+
options.LeaderElectionID = "controller-leader-election-helper"
339+
}
340+
320341
// Allow newResourceLock to be mocked
321342
if options.newResourceLock == nil {
322343
options.newResourceLock = leaderelection.NewResourceLock

pkg/manager/manager_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,34 @@ var _ = Describe("manger.Manager", func() {
285285
<-c3
286286
})
287287

288+
It("shouldn't allow empty LeaderElectionID for controller", func(done Done) {
289+
m, err := New(cfg, options)
290+
Expect(err).ToNot(HaveOccurred())
291+
292+
r := &singletonRunnable{
293+
leaderElectionID: "",
294+
}
295+
Expect(m.Add(r)).To(Succeed())
296+
Expect(m.Start(stop).Error()).To(ContainSubstring("LeaderElectionID must be configured"))
297+
298+
close(done)
299+
})
300+
301+
It("shouldn't allow non unique LeaderElectionID for controller", func(done Done) {
302+
if options.LeaderElectionID != "" {
303+
m, err := New(cfg, options)
304+
Expect(err).ToNot(HaveOccurred())
305+
306+
r := &singletonRunnable{
307+
leaderElectionID: options.LeaderElectionID,
308+
}
309+
Expect(m.Add(r)).To(Succeed())
310+
Expect(m.Start(stop).Error()).To(ContainSubstring("LeaderElectionID must be unique"))
311+
}
312+
313+
close(done)
314+
})
315+
288316
It("should return an error if any non-leaderelection Components fail to Start", func() {
289317
// TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429
290318
})
@@ -711,3 +739,19 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error {
711739
func (i *injectable) Start(<-chan struct{}) error {
712740
return nil
713741
}
742+
743+
type singletonRunnable struct {
744+
leaderElectionID string
745+
}
746+
747+
func (*singletonRunnable) Start(<-chan struct{}) error {
748+
return nil
749+
}
750+
751+
func (*singletonRunnable) GetLeaderElection() bool {
752+
return true
753+
}
754+
755+
func (sr *singletonRunnable) GetLeaderElectionID() string {
756+
return sr.leaderElectionID
757+
}

0 commit comments

Comments
 (0)