Skip to content

Commit 8dfd21a

Browse files
added leader election for runnables
1 parent f99287c commit 8dfd21a

File tree

7 files changed

+165
-102
lines changed

7 files changed

+165
-102
lines changed

pkg/controller/controller.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ type Options struct {
3535

3636
// Reconciler reconciles an object
3737
Reconciler reconcile.Reconciler
38+
39+
// LeaderElection determines whether or not to use leader election when starting the controller.
40+
// Defaults to true
41+
LeaderElection bool
42+
43+
// LeaderElectionID determines the name of the configmap that leader election will use for holding the leader lock.
44+
LeaderElectionID string
3845
}
3946

4047
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -89,6 +96,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8996
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
9097
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
9198
Name: name,
99+
LeaderElection: options.LeaderElection,
100+
LeaderElectionID: options.LeaderElectionID,
92101
}
93102

94103
// Add the controller as a Manager components

pkg/internal/controller/controller.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ type Controller struct {
5050
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
5151
MaxConcurrentReconciles int
5252

53+
// LeaderElection determines whether or not to use leader election when
54+
// starting the controller.
55+
LeaderElection bool
56+
57+
// LeaderElectionID determines the name of the configmap that leader election
58+
// will use for holding the leader lock.
59+
LeaderElectionID string
60+
5361
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
5462
// ensures that the state of the system matches the state specified in the object.
5563
// Defaults to the DefaultReconcileFunc.
@@ -256,3 +264,11 @@ func (c *Controller) InjectFunc(f inject.Func) error {
256264
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
257265
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
258266
}
267+
268+
func (c *Controller) NeedLeaderElection() bool {
269+
return c.LeaderElection
270+
}
271+
272+
func (c *Controller) GetID() string {
273+
return c.LeaderElectionID
274+
}

pkg/leaderelection/leader_election.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op
5252
return nil, nil
5353
}
5454

55-
// Default the LeaderElectionID
56-
if options.LeaderElectionID == "" {
57-
options.LeaderElectionID = "controller-leader-election-helper"
58-
}
59-
6055
// Default the namespace (if running in cluster)
6156
if options.LeaderElectionNamespace == "" {
6257
var err error

pkg/manager/internal.go

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package manager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net"
2324
"net/http"
@@ -34,6 +35,7 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/cache"
3536
"sigs.k8s.io/controller-runtime/pkg/client"
3637
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
38+
crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection"
3739
"sigs.k8s.io/controller-runtime/pkg/metrics"
3840
"sigs.k8s.io/controller-runtime/pkg/recorder"
3941
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@@ -58,9 +60,9 @@ type controllerManager struct {
5860
// to scheme.scheme.
5961
scheme *runtime.Scheme
6062

61-
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
63+
// leaderElectionRunnables is the map that groups runnables that use same leader election ID.
6264
// These Runnables are managed by lead election.
63-
leaderElectionRunnables []Runnable
65+
leaderElectionRunnables map[string][]Runnable
6466
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
6567
// These Runnables will not be blocked by lead election.
6668
nonLeaderElectionRunnables []Runnable
@@ -82,8 +84,9 @@ type controllerManager struct {
8284
// (and EventHandlers, Sources and Predicates).
8385
recorderProvider recorder.Provider
8486

85-
// resourceLock forms the basis for leader election
86-
resourceLock resourcelock.Interface
87+
// leaderElectionNamespace determines the namespace in which the leader
88+
// election configmaps will be created.
89+
leaderElectionNamespace string
8790

8891
// mapper is used to map resources to kind, and map kind and version.
8992
mapper meta.RESTMapper
@@ -123,6 +126,9 @@ type controllerManager struct {
123126
// retryPeriod is the duration the LeaderElector clients should wait
124127
// between tries of actions.
125128
retryPeriod time.Duration
129+
130+
// Dependency injection for testing
131+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options crleaderelection.Options) (resourcelock.Interface, error)
126132
}
127133

128134
// Add sets dependencies on i, and adds it to the list of Runnables to start.
@@ -135,11 +141,27 @@ func (cm *controllerManager) Add(r Runnable) error {
135141
return err
136142
}
137143

144+
if cm.leaderElectionRunnables == nil {
145+
cm.leaderElectionRunnables = make(map[string][]Runnable)
146+
}
147+
138148
// Add the runnable to the leader election or the non-leaderelection list
139-
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
140-
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
149+
if leRunnable, ok := r.(LeaderElectionRunnable); ok && leRunnable.NeedLeaderElection() {
150+
runnables := []Runnable{r}
151+
leID := leRunnable.GetID()
152+
153+
// Check that leader election ID is defined
154+
if leID == "" {
155+
return errors.New("LeaderElectionID must be configured")
156+
}
157+
158+
if rs, exists := cm.leaderElectionRunnables[leID]; exists {
159+
runnables = append(runnables, rs...)
160+
}
161+
162+
cm.leaderElectionRunnables[leID] = runnables
141163
} else {
142-
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
164+
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
143165
}
144166

145167
if cm.started {
@@ -265,14 +287,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
265287

266288
go cm.startNonLeaderElectionRunnables()
267289

268-
if cm.resourceLock != nil {
269-
err := cm.startLeaderElection()
270-
if err != nil {
271-
return err
272-
}
273-
} else {
274-
go cm.startLeaderElectionRunnables()
275-
}
290+
go cm.startLeaderElectionRunnables()
276291

277292
select {
278293
case <-stop:
@@ -308,12 +323,38 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
308323
cm.waitForCache()
309324

310325
// Start the leader election Runnables after the cache has synced
311-
for _, c := range cm.leaderElectionRunnables {
326+
for leID, rs := range cm.leaderElectionRunnables {
312327
// Controllers block, but we want to return an error if any have an error starting.
313328
// Write any Start errors to a channel so we can return them
314-
ctrl := c
329+
leaderElectionID := leID
330+
runnables := rs
315331
go func() {
316-
cm.errChan <- ctrl.Start(cm.internalStop)
332+
// Create resource lock
333+
resourceLock, err := cm.newResourceLock(cm.config, cm.recorderProvider, crleaderelection.Options{
334+
LeaderElection: true,
335+
LeaderElectionID: leaderElectionID,
336+
LeaderElectionNamespace: cm.leaderElectionNamespace,
337+
})
338+
if err != nil {
339+
cm.errChan <- err
340+
}
341+
342+
err = cm.startLeaderElection(resourceLock, leaderelection.LeaderCallbacks{
343+
OnStartedLeading: func(_ context.Context) {
344+
for _, r := range runnables {
345+
runnable := r
346+
go func() {
347+
cm.errChan <- runnable.Start(cm.internalStop)
348+
}()
349+
}
350+
},
351+
OnStoppedLeading: func() {
352+
cm.errChan <- fmt.Errorf("runnable leader election lost")
353+
},
354+
})
355+
if err != nil {
356+
cm.errChan <- err
357+
}
317358
}()
318359
}
319360
}
@@ -339,23 +380,13 @@ func (cm *controllerManager) waitForCache() {
339380
cm.started = true
340381
}
341382

342-
func (cm *controllerManager) startLeaderElection() (err error) {
383+
func (cm *controllerManager) startLeaderElection(resourceLock resourcelock.Interface, callbacks leaderelection.LeaderCallbacks) (err error) {
343384
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
344-
Lock: cm.resourceLock,
385+
Lock: resourceLock,
345386
LeaseDuration: cm.leaseDuration,
346387
RenewDeadline: cm.renewDeadline,
347388
RetryPeriod: cm.retryPeriod,
348-
Callbacks: leaderelection.LeaderCallbacks{
349-
OnStartedLeading: func(_ context.Context) {
350-
cm.startLeaderElectionRunnables()
351-
},
352-
OnStoppedLeading: func() {
353-
// Most implementations of leader election log.Fatal() here.
354-
// Since Start is wrapped in log.Fatal when called, we can just return
355-
// an error here which will cause the program to exit.
356-
cm.errChan <- fmt.Errorf("leader election lost")
357-
},
358-
},
389+
Callbacks: callbacks,
359390
})
360391
if err != nil {
361392
return err

pkg/manager/manager.go

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,10 @@ type Options struct {
103103
// value only if you know what you are doing. Defaults to 10 hours if unset.
104104
SyncPeriod *time.Duration
105105

106-
// LeaderElection determines whether or not to use leader election when
107-
// starting the manager.
108-
LeaderElection bool
109-
110106
// LeaderElectionNamespace determines the namespace in which the leader
111-
// election configmap will be created.
107+
// election configmaps will be created.
112108
LeaderElectionNamespace string
113109

114-
// LeaderElectionID determines the name of the configmap that leader election
115-
// will use for holding the leader lock.
116-
LeaderElectionID string
117-
118110
// LeaseDuration is the duration that non-leader candidates will
119111
// wait to force acquire leadership. This is measured against time of
120112
// last observed ack. Default is 15 seconds.
@@ -190,6 +182,9 @@ type LeaderElectionRunnable interface {
190182
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
191183
// e.g. controllers need to be run in leader election mode, while webhook server doesn't.
192184
NeedLeaderElection() bool
185+
186+
// GetID returns leader election ID
187+
GetID() string
193188
}
194189

195190
// New returns a new Manager for creating Controllers.
@@ -232,16 +227,6 @@ func New(config *rest.Config, options Options) (Manager, error) {
232227
return nil, err
233228
}
234229

235-
// Create the resource lock to enable leader election)
236-
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
237-
LeaderElection: options.LeaderElection,
238-
LeaderElectionID: options.LeaderElectionID,
239-
LeaderElectionNamespace: options.LeaderElectionNamespace,
240-
})
241-
if err != nil {
242-
return nil, err
243-
}
244-
245230
// Create the mertics listener. This will throw an error if the metrics bind
246231
// address is invalid or already in use.
247232
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
@@ -252,24 +237,25 @@ func New(config *rest.Config, options Options) (Manager, error) {
252237
stop := make(chan struct{})
253238

254239
return &controllerManager{
255-
config: config,
256-
scheme: options.Scheme,
257-
errChan: make(chan error),
258-
cache: cache,
259-
fieldIndexes: cache,
260-
client: writeObj,
261-
apiReader: apiReader,
262-
recorderProvider: recorderProvider,
263-
resourceLock: resourceLock,
264-
mapper: mapper,
265-
metricsListener: metricsListener,
266-
internalStop: stop,
267-
internalStopper: stop,
268-
port: options.Port,
269-
host: options.Host,
270-
leaseDuration: *options.LeaseDuration,
271-
renewDeadline: *options.RenewDeadline,
272-
retryPeriod: *options.RetryPeriod,
240+
config: config,
241+
scheme: options.Scheme,
242+
errChan: make(chan error),
243+
cache: cache,
244+
fieldIndexes: cache,
245+
client: writeObj,
246+
apiReader: apiReader,
247+
recorderProvider: recorderProvider,
248+
mapper: mapper,
249+
metricsListener: metricsListener,
250+
internalStop: stop,
251+
internalStopper: stop,
252+
port: options.Port,
253+
host: options.Host,
254+
leaderElectionNamespace: options.LeaderElectionNamespace,
255+
leaseDuration: *options.LeaseDuration,
256+
renewDeadline: *options.RenewDeadline,
257+
retryPeriod: *options.RetryPeriod,
258+
newResourceLock: options.newResourceLock,
273259
}, nil
274260
}
275261

0 commit comments

Comments
 (0)