@@ -18,6 +18,7 @@ package manager
18
18
19
19
import (
20
20
"context"
21
+ "errors"
21
22
"fmt"
22
23
"net"
23
24
"net/http"
@@ -34,6 +35,7 @@ import (
34
35
"sigs.k8s.io/controller-runtime/pkg/cache"
35
36
"sigs.k8s.io/controller-runtime/pkg/client"
36
37
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
38
+ crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection"
37
39
"sigs.k8s.io/controller-runtime/pkg/metrics"
38
40
"sigs.k8s.io/controller-runtime/pkg/recorder"
39
41
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@@ -58,9 +60,9 @@ type controllerManager struct {
58
60
// to scheme.scheme.
59
61
scheme * runtime.Scheme
60
62
61
- // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts .
63
+ // leaderElectionRunnables is the map that groups runnables that use same leader election ID .
62
64
// These Runnables are managed by lead election.
63
- leaderElectionRunnables []Runnable
65
+ leaderElectionRunnables map [ string ] []Runnable
64
66
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
65
67
// These Runnables will not be blocked by lead election.
66
68
nonLeaderElectionRunnables []Runnable
@@ -82,8 +84,9 @@ type controllerManager struct {
82
84
// (and EventHandlers, Sources and Predicates).
83
85
recorderProvider recorder.Provider
84
86
85
- // resourceLock forms the basis for leader election
86
- resourceLock resourcelock.Interface
87
+ // leaderElectionNamespace determines the namespace in which the leader
88
+ // election configmaps will be created.
89
+ leaderElectionNamespace string
87
90
88
91
// mapper is used to map resources to kind, and map kind and version.
89
92
mapper meta.RESTMapper
@@ -123,6 +126,9 @@ type controllerManager struct {
123
126
// retryPeriod is the duration the LeaderElector clients should wait
124
127
// between tries of actions.
125
128
retryPeriod time.Duration
129
+
130
+ // Dependency injection for testing
131
+ newResourceLock func (config * rest.Config , recorderProvider recorder.Provider , options crleaderelection.Options ) (resourcelock.Interface , error )
126
132
}
127
133
128
134
// Add sets dependencies on i, and adds it to the list of Runnables to start.
@@ -135,11 +141,27 @@ func (cm *controllerManager) Add(r Runnable) error {
135
141
return err
136
142
}
137
143
144
+ if cm .leaderElectionRunnables == nil {
145
+ cm .leaderElectionRunnables = make (map [string ][]Runnable )
146
+ }
147
+
138
148
// Add the runnable to the leader election or the non-leaderelection list
139
- if leRunnable , ok := r .(LeaderElectionRunnable ); ok && ! leRunnable .NeedLeaderElection () {
140
- cm .nonLeaderElectionRunnables = append (cm .nonLeaderElectionRunnables , r )
149
+ if leRunnable , ok := r .(LeaderElectionRunnable ); ok && leRunnable .NeedLeaderElection () {
150
+ runnables := []Runnable {r }
151
+ leID := leRunnable .GetID ()
152
+
153
+ // Check that leader election ID is defined
154
+ if leID == "" {
155
+ return errors .New ("LeaderElectionID must be configured" )
156
+ }
157
+
158
+ if rs , exists := cm .leaderElectionRunnables [leID ]; exists {
159
+ runnables = append (runnables , rs ... )
160
+ }
161
+
162
+ cm .leaderElectionRunnables [leID ] = runnables
141
163
} else {
142
- cm .leaderElectionRunnables = append (cm .leaderElectionRunnables , r )
164
+ cm .nonLeaderElectionRunnables = append (cm .nonLeaderElectionRunnables , r )
143
165
}
144
166
145
167
if cm .started {
@@ -265,14 +287,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
265
287
266
288
go cm .startNonLeaderElectionRunnables ()
267
289
268
- if cm .resourceLock != nil {
269
- err := cm .startLeaderElection ()
270
- if err != nil {
271
- return err
272
- }
273
- } else {
274
- go cm .startLeaderElectionRunnables ()
275
- }
290
+ go cm .startLeaderElectionRunnables ()
276
291
277
292
select {
278
293
case <- stop :
@@ -308,12 +323,38 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
308
323
cm .waitForCache ()
309
324
310
325
// Start the leader election Runnables after the cache has synced
311
- for _ , c := range cm .leaderElectionRunnables {
326
+ for leID , rs := range cm .leaderElectionRunnables {
312
327
// Controllers block, but we want to return an error if any have an error starting.
313
328
// Write any Start errors to a channel so we can return them
314
- ctrl := c
329
+ leaderElectionID := leID
330
+ runnables := rs
315
331
go func () {
316
- cm .errChan <- ctrl .Start (cm .internalStop )
332
+ // Create resource lock
333
+ resourceLock , err := cm .newResourceLock (cm .config , cm .recorderProvider , crleaderelection.Options {
334
+ LeaderElection : true ,
335
+ LeaderElectionID : leaderElectionID ,
336
+ LeaderElectionNamespace : cm .leaderElectionNamespace ,
337
+ })
338
+ if err != nil {
339
+ cm .errChan <- err
340
+ }
341
+
342
+ err = cm .startLeaderElection (resourceLock , leaderelection.LeaderCallbacks {
343
+ OnStartedLeading : func (_ context.Context ) {
344
+ for _ , r := range runnables {
345
+ runnable := r
346
+ go func () {
347
+ cm .errChan <- runnable .Start (cm .internalStop )
348
+ }()
349
+ }
350
+ },
351
+ OnStoppedLeading : func () {
352
+ cm .errChan <- fmt .Errorf ("runnable leader election lost" )
353
+ },
354
+ })
355
+ if err != nil {
356
+ cm .errChan <- err
357
+ }
317
358
}()
318
359
}
319
360
}
@@ -339,23 +380,13 @@ func (cm *controllerManager) waitForCache() {
339
380
cm .started = true
340
381
}
341
382
342
- func (cm * controllerManager ) startLeaderElection () (err error ) {
383
+ func (cm * controllerManager ) startLeaderElection (resourceLock resourcelock. Interface , callbacks leaderelection. LeaderCallbacks ) (err error ) {
343
384
l , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
344
- Lock : cm . resourceLock ,
385
+ Lock : resourceLock ,
345
386
LeaseDuration : cm .leaseDuration ,
346
387
RenewDeadline : cm .renewDeadline ,
347
388
RetryPeriod : cm .retryPeriod ,
348
- Callbacks : leaderelection.LeaderCallbacks {
349
- OnStartedLeading : func (_ context.Context ) {
350
- cm .startLeaderElectionRunnables ()
351
- },
352
- OnStoppedLeading : func () {
353
- // Most implementations of leader election log.Fatal() here.
354
- // Since Start is wrapped in log.Fatal when called, we can just return
355
- // an error here which will cause the program to exit.
356
- cm .errChan <- fmt .Errorf ("leader election lost" )
357
- },
358
- },
389
+ Callbacks : callbacks ,
359
390
})
360
391
if err != nil {
361
392
return err
0 commit comments