Skip to content

Commit 73031d7

Browse files
committed
Add leader election to controller manager
1 parent 67e2874 commit 73031d7

File tree

5 files changed

+195
-4
lines changed

5 files changed

+195
-4
lines changed

pkg/leaderelection/doc.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
Package leaderelection contains a constructors for a leader election resource lock
19+
*/
20+
21+
package leaderelection

pkg/leaderelection/leader_election.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"fmt"
21+
"os"
22+
23+
"github.com/pborman/uuid"
24+
"k8s.io/client-go/kubernetes"
25+
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection/resourcelock"
27+
"sigs.k8s.io/controller-runtime/pkg/recorder"
28+
)
29+
30+
// Options provides the required configuration to create a new resource lock
31+
type Options struct {
32+
// LeaderElection determines whether or not to use leader election when
33+
// starting the manager.
34+
LeaderElection bool
35+
36+
// LeaderElectionNamespace determines the namespace in which the leader
37+
// election configmap will be created.
38+
LeaderElectionNamespace string
39+
40+
// LeaderElectionID determines the name of the configmap that leader election
41+
// will use for holding the leader lock.
42+
LeaderElectionID string
43+
}
44+
45+
// NewResourceLock creates a new config map resource lock for use in a leader
46+
// election loop
47+
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error) {
48+
if !options.LeaderElection {
49+
return nil, nil
50+
}
51+
52+
if options.LeaderElectionID == "" || options.LeaderElectionNamespace == "" {
53+
return nil, fmt.Errorf("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set")
54+
}
55+
56+
// Leader id, needs to be unique
57+
id, err := os.Hostname()
58+
if err != nil {
59+
return nil, err
60+
}
61+
id = id + "_" + string(uuid.NewUUID())
62+
63+
// Construct client for leader election
64+
client, err := kubernetes.NewForConfig(config)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
70+
options.LeaderElectionNamespace,
71+
options.LeaderElectionID,
72+
client.CoreV1(),
73+
resourcelock.ResourceLockConfig{
74+
Identity: id,
75+
EventRecorder: recorderProvider.GetEventRecorderFor(id),
76+
})
77+
}

pkg/manager/internal.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ limitations under the License.
1717
package manager
1818

1919
import (
20+
"fmt"
2021
"sync"
22+
"time"
2123

2224
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection"
27+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2428
"k8s.io/client-go/tools/record"
2529
"sigs.k8s.io/controller-runtime/pkg/cache"
2630
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -56,6 +60,9 @@ type controllerManager struct {
5660
// (and EventHandlers, Sources and Predicates).
5761
recorderProvider recorder.Provider
5862

63+
// resourceLock
64+
resourceLock resourcelock.Interface
65+
5966
mu sync.Mutex
6067
started bool
6168
errChan chan error
@@ -133,6 +140,49 @@ func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
133140
}
134141

135142
func (cm *controllerManager) Start(stop <-chan struct{}) error {
143+
if cm.resourceLock == nil {
144+
go cm.start(stop)
145+
select {
146+
case <-stop:
147+
// we are done
148+
return nil
149+
case err := <-cm.errChan:
150+
// Error starting a controller
151+
return err
152+
}
153+
}
154+
155+
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
156+
Lock: cm.resourceLock,
157+
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
158+
// TODO(joelspeed): These timings should be configurable
159+
LeaseDuration: 15 * time.Second,
160+
RenewDeadline: 10 * time.Second,
161+
RetryPeriod: 2 * time.Second,
162+
Callbacks: leaderelection.LeaderCallbacks{
163+
OnStartedLeading: cm.start,
164+
OnStoppedLeading: func() {
165+
cm.errChan <- fmt.Errorf("leader election lost")
166+
},
167+
},
168+
})
169+
if err != nil {
170+
return err
171+
}
172+
173+
go l.Run()
174+
175+
select {
176+
case <-stop:
177+
// We are done
178+
return nil
179+
case err := <-cm.errChan:
180+
// Error starting a controller
181+
return err
182+
}
183+
}
184+
185+
func (cm *controllerManager) start(stop <-chan struct{}) {
136186
func() {
137187
cm.mu.Lock()
138188
defer cm.mu.Unlock()
@@ -169,9 +219,6 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
169219
select {
170220
case <-stop:
171221
// We are done
172-
return nil
173-
case err := <-cm.errChan:
174-
// Error starting a controller
175-
return err
222+
return
176223
}
177224
}

pkg/manager/manager.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/client-go/kubernetes/scheme"
2727
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2829
"k8s.io/client-go/tools/record"
2930
"sigs.k8s.io/controller-runtime/pkg/cache"
3031
"sigs.k8s.io/controller-runtime/pkg/client"
3132
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3233
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
34+
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
3335
"sigs.k8s.io/controller-runtime/pkg/recorder"
3436
)
3537

@@ -83,10 +85,23 @@ type Options struct {
8385
// value only if you know what you are doing. Defaults to 10 hours if unset.
8486
SyncPeriod *time.Duration
8587

88+
// LeaderElection determines whether or not to use leader election when
89+
// starting the manager.
90+
LeaderElection bool
91+
92+
// LeaderElectionNamespace determines the namespace in which the leader
93+
// election configmap will be created.
94+
LeaderElectionNamespace string
95+
96+
// LeaderElectionID determines the name of the configmap that leader election
97+
// will use for holding the leader lock.
98+
LeaderElectionID string
99+
86100
// Dependency injection for testing
87101
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
88102
newClient func(config *rest.Config, options client.Options) (client.Client, error)
89103
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
104+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
90105
}
91106

92107
// Runnable allows a component to be started.
@@ -140,6 +155,16 @@ func New(config *rest.Config, options Options) (Manager, error) {
140155
return nil, err
141156
}
142157

158+
// Create the resource lock to enable leader election)
159+
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
160+
LeaderElection: options.LeaderElection,
161+
LeaderElectionID: options.LeaderElectionID,
162+
LeaderElectionNamespace: options.LeaderElectionNamespace,
163+
})
164+
if err != nil {
165+
return nil, err
166+
}
167+
143168
return &controllerManager{
144169
config: config,
145170
scheme: options.Scheme,
@@ -148,6 +173,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
148173
fieldIndexes: cache,
149174
client: client.DelegatingClient{Reader: cache, Writer: writeObj, StatusClient: writeObj},
150175
recorderProvider: recorderProvider,
176+
resourceLock: resourceLock,
151177
}, nil
152178
}
153179

@@ -177,5 +203,10 @@ func setOptionsDefaults(options Options) Options {
177203
options.newRecorderProvider = internalrecorder.NewProvider
178204
}
179205

206+
// Allow newResourceLock to be mocked
207+
if options.newResourceLock == nil {
208+
options.newResourceLock = leaderelection.NewResourceLock
209+
}
210+
180211
return options
181212
}

pkg/manager/manager_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,21 @@ var _ = Describe("manger.Manager", func() {
9696

9797
close(done)
9898
})
99+
Context("with leader election enabled", func() {
100+
It("should return an error if ID not set", func() {
101+
m, err := New(cfg, Options{LeaderElection: true, LeaderElectionNamespace: "default"})
102+
Expect(m).To(BeNil())
103+
Expect(err).To(HaveOccurred())
104+
Expect(err.Error()).To(ContainSubstring("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set"))
105+
})
106+
107+
It("should return an error if namespace not set", func() {
108+
m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"})
109+
Expect(m).To(BeNil())
110+
Expect(err).To(HaveOccurred())
111+
Expect(err.Error()).To(ContainSubstring("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set"))
112+
})
113+
})
99114
})
100115

101116
Describe("Start", func() {

0 commit comments

Comments
 (0)