Skip to content

Commit ff25f2d

Browse files
added leader election for runnables
1 parent 5aca8b7 commit ff25f2d

File tree

11 files changed

+375
-127
lines changed

11 files changed

+375
-127
lines changed

pkg/builder/controller_test.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ var _ = Describe("application", func() {
6262
instance, err := ControllerManagedBy(m).
6363
For(&appsv1.ReplicaSet{}).
6464
Owns(&appsv1.ReplicaSet{}).
65+
WithOptions(controller.Options{
66+
LeaderElection: &controller.LeaderElectionOptions{
67+
NeedLeaderElection: false,
68+
},
69+
}).
6570
Build(noop)
6671
Expect(err).NotTo(HaveOccurred())
6772
Expect(instance).NotTo(BeNil())
@@ -76,6 +81,11 @@ var _ = Describe("application", func() {
7681
instance, err := ControllerManagedBy(m).
7782
For(&fakeType{}).
7883
Owns(&appsv1.ReplicaSet{}).
84+
WithOptions(controller.Options{
85+
LeaderElection: &controller.LeaderElectionOptions{
86+
NeedLeaderElection: false,
87+
},
88+
}).
7989
Build(noop)
8090
Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType")))
8191
Expect(instance).To(BeNil())
@@ -98,6 +108,11 @@ var _ = Describe("application", func() {
98108
instance, err := ControllerManagedBy(m).
99109
For(&appsv1.ReplicaSet{}).
100110
Owns(&appsv1.ReplicaSet{}).
111+
WithOptions(controller.Options{
112+
LeaderElection: &controller.LeaderElectionOptions{
113+
NeedLeaderElection: false,
114+
},
115+
}).
101116
Build(noop)
102117
Expect(err).To(HaveOccurred())
103118
Expect(err.Error()).To(ContainSubstring("expected error"))
@@ -121,7 +136,12 @@ var _ = Describe("application", func() {
121136
instance, err := ControllerManagedBy(m).
122137
For(&appsv1.ReplicaSet{}).
123138
Owns(&appsv1.ReplicaSet{}).
124-
WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
139+
WithOptions(controller.Options{
140+
MaxConcurrentReconciles: maxConcurrentReconciles,
141+
LeaderElection: &controller.LeaderElectionOptions{
142+
NeedLeaderElection: false,
143+
},
144+
}).
125145
Build(noop)
126146
Expect(err).NotTo(HaveOccurred())
127147
Expect(instance).NotTo(BeNil())
@@ -164,6 +184,11 @@ var _ = Describe("application", func() {
164184
ctrl1, err := ControllerManagedBy(m).
165185
For(&TestDefaultValidator{}).
166186
Owns(&appsv1.ReplicaSet{}).
187+
WithOptions(controller.Options{
188+
LeaderElection: &controller.LeaderElectionOptions{
189+
NeedLeaderElection: false,
190+
},
191+
}).
167192
Build(noop)
168193
Expect(err).NotTo(HaveOccurred())
169194
Expect(ctrl1).NotTo(BeNil())
@@ -172,6 +197,11 @@ var _ = Describe("application", func() {
172197
ctrl2, err := ControllerManagedBy(m).
173198
For(&TestDefaultValidator{}).
174199
Owns(&appsv1.ReplicaSet{}).
200+
WithOptions(controller.Options{
201+
LeaderElection: &controller.LeaderElectionOptions{
202+
NeedLeaderElection: false,
203+
},
204+
}).
175205
Build(noop)
176206
Expect(err).NotTo(HaveOccurred())
177207
Expect(ctrl2).NotTo(BeNil())
@@ -185,7 +215,12 @@ var _ = Describe("application", func() {
185215

186216
bldr := ControllerManagedBy(m).
187217
For(&appsv1.Deployment{}).
188-
Owns(&appsv1.ReplicaSet{})
218+
Owns(&appsv1.ReplicaSet{}).
219+
WithOptions(controller.Options{
220+
LeaderElection: &controller.LeaderElectionOptions{
221+
NeedLeaderElection: false,
222+
},
223+
})
189224
doReconcileTest("3", stop, bldr, m, false)
190225
close(done)
191226
}, 10)
@@ -196,6 +231,11 @@ var _ = Describe("application", func() {
196231

197232
bldr := ControllerManagedBy(m).
198233
For(&appsv1.Deployment{}).
234+
WithOptions(controller.Options{
235+
LeaderElection: &controller.LeaderElectionOptions{
236+
NeedLeaderElection: false,
237+
},
238+
}).
199239
Watches( // Equivalent of Owns
200240
&source.Kind{Type: &appsv1.ReplicaSet{}},
201241
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})

pkg/cache/informer_cache.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ func (ip *informerCache) NeedLeaderElection() bool {
169169
return false
170170
}
171171

172+
// GetID implements the LeaderElectionRunnable interface.
173+
// It's dummy method that always returns empty string as informerCache doesn't need leader election.
174+
func (ip *informerCache) GetID() string {
175+
return ""
176+
}
177+
172178
// IndexField adds an indexer to the underlying cache, using extraction function to get
173179
// value(s) from the given field. This index can then be used by passing a field selector
174180
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.

pkg/controller/controller.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ type Options struct {
4141
// Defaults to MaxOfRateLimiter which has both overall and per-item rate limiting.
4242
// The overall is a token bucket and the per-item is exponential.
4343
RateLimiter ratelimiter.RateLimiter
44+
45+
// Leader election is by default
46+
LeaderElection *LeaderElectionOptions
47+
}
48+
49+
// Leader Election options
50+
type LeaderElectionOptions struct {
51+
// NeedLeaderElection determines whether or not to use leader election when starting the controller.
52+
NeedLeaderElection bool
53+
54+
// LeaderElectionID determines the name of the configmap that leader election will use for holding the leader lock.
55+
LeaderElectionID string
4456
}
4557

4658
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -83,6 +95,13 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
8395
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
8496
}
8597

98+
if options.LeaderElection == nil {
99+
options.LeaderElection = &LeaderElectionOptions{
100+
NeedLeaderElection: true,
101+
LeaderElectionID: name,
102+
}
103+
}
104+
86105
// Inject dependencies into Reconciler
87106
if err := mgr.SetFields(options.Reconciler); err != nil {
88107
return nil, err
@@ -101,6 +120,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
101120
},
102121
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
103122
Name: name,
123+
LeaderElection: options.LeaderElection.NeedLeaderElection,
124+
LeaderElectionID: options.LeaderElection.LeaderElectionID,
104125
}
105126

106127
// Add the controller as a Manager components

pkg/controller/controller_integration_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ var _ = Describe("controller", func() {
6565
reconciled <- request
6666
return reconcile.Result{}, nil
6767
}),
68+
LeaderElection: &controller.LeaderElectionOptions{
69+
NeedLeaderElection: false,
70+
},
6871
})
6972
Expect(err).NotTo(HaveOccurred())
7073

pkg/controller/controller_test.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ var _ = Describe("controller.Controller", func() {
4848
It("should return an error if Name is not Specified", func(done Done) {
4949
m, err := manager.New(cfg, manager.Options{})
5050
Expect(err).NotTo(HaveOccurred())
51-
c, err := controller.New("", m, controller.Options{Reconciler: rec})
51+
c, err := controller.New("", m, controller.Options{
52+
Reconciler: rec,
53+
LeaderElection: &controller.LeaderElectionOptions{
54+
NeedLeaderElection: false,
55+
},
56+
})
5257
Expect(c).To(BeNil())
5358
Expect(err.Error()).To(ContainSubstring("must specify Name for Controller"))
5459

@@ -70,7 +75,12 @@ var _ = Describe("controller.Controller", func() {
7075
m, err := manager.New(cfg, manager.Options{})
7176
Expect(err).NotTo(HaveOccurred())
7277

73-
c, err := controller.New("foo", m, controller.Options{Reconciler: &failRec{}})
78+
c, err := controller.New("foo", m, controller.Options{
79+
Reconciler: &failRec{},
80+
LeaderElection: &controller.LeaderElectionOptions{
81+
NeedLeaderElection: false,
82+
},
83+
})
7484
Expect(c).To(BeNil())
7585
Expect(err).To(HaveOccurred())
7686
Expect(err.Error()).To(ContainSubstring("expected error"))
@@ -82,11 +92,21 @@ var _ = Describe("controller.Controller", func() {
8292
m, err := manager.New(cfg, manager.Options{})
8393
Expect(err).NotTo(HaveOccurred())
8494

85-
c1, err := controller.New("c1", m, controller.Options{Reconciler: rec})
95+
c1, err := controller.New("c1", m, controller.Options{
96+
Reconciler: rec,
97+
LeaderElection: &controller.LeaderElectionOptions{
98+
NeedLeaderElection: false,
99+
},
100+
})
86101
Expect(err).NotTo(HaveOccurred())
87102
Expect(c1).ToNot(BeNil())
88103

89-
c2, err := controller.New("c2", m, controller.Options{Reconciler: rec})
104+
c2, err := controller.New("c2", m, controller.Options{
105+
Reconciler: rec,
106+
LeaderElection: &controller.LeaderElectionOptions{
107+
NeedLeaderElection: false,
108+
},
109+
})
90110
Expect(err).NotTo(HaveOccurred())
91111
Expect(c2).ToNot(BeNil())
92112

pkg/internal/controller/controller.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ type Controller struct {
5050
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
5151
MaxConcurrentReconciles int
5252

53+
// LeaderElection determines whether or not to use leader election when
54+
// starting the controller.
55+
LeaderElection bool
56+
57+
// LeaderElectionID determines the name of the configmap that leader election
58+
// will use for holding the leader lock.
59+
LeaderElectionID string
60+
5361
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
5462
// ensures that the state of the system matches the state specified in the object.
5563
// Defaults to the DefaultReconcileFunc.
@@ -296,3 +304,11 @@ func (c *Controller) InjectFunc(f inject.Func) error {
296304
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
297305
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
298306
}
307+
308+
func (c *Controller) NeedLeaderElection() bool {
309+
return c.LeaderElection
310+
}
311+
312+
func (c *Controller) GetID() string {
313+
return c.LeaderElectionID
314+
}

pkg/internal/recorder/recorder_integration_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ var _ = Describe("recorder", func() {
6161
recorder.Event(dp, corev1.EventTypeNormal, "test-reason", "test-msg")
6262
return reconcile.Result{}, nil
6363
}),
64+
LeaderElection: &controller.LeaderElectionOptions{
65+
NeedLeaderElection: false,
66+
},
6467
})
6568
Expect(err).NotTo(HaveOccurred())
6669

0 commit comments

Comments
 (0)