Skip to content

Commit 0597abd

Browse files
added health probes
1 parent faa41bc commit 0597abd

File tree

3 files changed

+278
-18
lines changed

3 files changed

+278
-18
lines changed

pkg/manager/internal.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,19 @@ type controllerManager struct {
7979
// metricsListener is used to serve prometheus metrics
8080
metricsListener net.Listener
8181

82+
// healthProbeListener is used to serve liveness probe
83+
healthProbeListener net.Listener
84+
85+
// Readiness probe endpoint name
86+
readinessEndpointName string
87+
88+
// Liveness probe endpoint name
89+
livenessEndpointName string
90+
8291
mu sync.Mutex
8392
started bool
93+
isReady bool
94+
isAlive bool
8495
errChan chan error
8596

8697
// internalStop is the stop channel *actually* used by everything involved
@@ -224,6 +235,57 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
224235
}
225236
}
226237

238+
func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
239+
mux := http.NewServeMux()
240+
241+
if cm.readinessEndpointName != "" {
242+
var readinessHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
243+
if cm.isReady {
244+
w.WriteHeader(200)
245+
_, err := w.Write([]byte("ok\n"))
246+
log.Error(err, "Failed to send readiness probe response")
247+
} else {
248+
w.WriteHeader(503)
249+
_, err := w.Write([]byte("error\n"))
250+
log.Error(err, "Failed to send readiness probe response")
251+
}
252+
}
253+
mux.Handle(cm.readinessEndpointName, readinessHandler)
254+
}
255+
if cm.livenessEndpointName != "" {
256+
var livenessHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
257+
if cm.isAlive {
258+
w.WriteHeader(200)
259+
_, err := w.Write([]byte("ok\n"))
260+
log.Error(err, "Failed to send liveness probe response")
261+
} else {
262+
w.WriteHeader(500)
263+
_, err := w.Write([]byte("error\n"))
264+
log.Error(err, "Failed to send liveness probe response")
265+
}
266+
}
267+
mux.Handle(cm.livenessEndpointName, livenessHandler)
268+
}
269+
270+
server := http.Server{
271+
Handler: mux,
272+
}
273+
// Run server
274+
go func() {
275+
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
276+
cm.errChan <- err
277+
}
278+
}()
279+
280+
// Shutdown the server when stop is closed
281+
select {
282+
case <-stop:
283+
if err := server.Shutdown(context.Background()); err != nil {
284+
cm.errChan <- err
285+
}
286+
}
287+
}
288+
227289
func (cm *controllerManager) Start(stop <-chan struct{}) error {
228290
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
229291
defer close(cm.internalStopper)
@@ -235,6 +297,11 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
235297
go cm.serveMetrics(cm.internalStop)
236298
}
237299

300+
// Serve health probes
301+
if cm.healthProbeListener != nil {
302+
go cm.serveHealthProbes(cm.internalStop)
303+
}
304+
238305
if cm.resourceLock != nil {
239306
err := cm.startLeaderElection()
240307
if err != nil {
@@ -313,3 +380,11 @@ func (cm *controllerManager) startLeaderElection() (err error) {
313380
go l.Run(context.Background())
314381
return nil
315382
}
383+
384+
func (cm *controllerManager) SetReady(ready bool) {
385+
cm.isReady = ready
386+
}
387+
388+
func (cm *controllerManager) SetAlive(alive bool) {
389+
cm.isAlive = alive
390+
}

pkg/manager/manager.go

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type Manager interface {
5151
// interface - e.g. inject.Client.
5252
SetFields(interface{}) error
5353

54+
// SetReady will set ready status for readiness probe
55+
SetReady(bool)
56+
57+
// SetAlive will set alive status for liveness probe
58+
SetAlive(bool)
59+
5460
// Start starts all registered Controllers and blocks until the Stop channel is closed.
5561
// Returns an error if there is an error starting any controller.
5662
Start(<-chan struct{}) error
@@ -125,6 +131,16 @@ type Options struct {
125131
// for serving prometheus metrics
126132
MetricsBindAddress string
127133

134+
// HealthProbeBindAddress is the TCP address that the controller should bind to
135+
// for serving health probes
136+
HealthProbeBindAddress string
137+
138+
// Readiness probe endpoint name
139+
ReadinessEndpointName string
140+
141+
// Liveness probe endpoint name
142+
LivenessEndpointName string
143+
128144
// Port is the port that the webhook server serves at.
129145
// It is used to set webhook.Server.Port.
130146
Port int
@@ -144,9 +160,10 @@ type Options struct {
144160
NewClient NewClientFunc
145161

146162
// Dependency injection for testing
147-
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
148-
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
149-
newMetricsListener func(addr string) (net.Listener, error)
163+
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
164+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
165+
newMetricsListener func(addr string) (net.Listener, error)
166+
newHealthProbeListener func(addr string) (net.Listener, error)
150167
}
151168

152169
// NewClientFunc allows a user to define how to create a client
@@ -229,24 +246,34 @@ func New(config *rest.Config, options Options) (Manager, error) {
229246
return nil, err
230247
}
231248

249+
// Create health probes listener. This will throw an error if the bind
250+
// address is invalid or already in use.
251+
healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
252+
if err != nil {
253+
return nil, err
254+
}
255+
232256
stop := make(chan struct{})
233257

234258
return &controllerManager{
235-
config: config,
236-
scheme: options.Scheme,
237-
errChan: make(chan error),
238-
cache: cache,
239-
fieldIndexes: cache,
240-
client: writeObj,
241-
apiReader: apiReader,
242-
recorderProvider: recorderProvider,
243-
resourceLock: resourceLock,
244-
mapper: mapper,
245-
metricsListener: metricsListener,
246-
internalStop: stop,
247-
internalStopper: stop,
248-
port: options.Port,
249-
host: options.Host,
259+
config: config,
260+
scheme: options.Scheme,
261+
errChan: make(chan error),
262+
cache: cache,
263+
fieldIndexes: cache,
264+
client: writeObj,
265+
apiReader: apiReader,
266+
recorderProvider: recorderProvider,
267+
resourceLock: resourceLock,
268+
mapper: mapper,
269+
metricsListener: metricsListener,
270+
healthProbeListener: healthProbeListener,
271+
readinessEndpointName: options.ReadinessEndpointName,
272+
livenessEndpointName: options.LivenessEndpointName,
273+
internalStop: stop,
274+
internalStopper: stop,
275+
port: options.Port,
276+
host: options.Host,
250277
}, nil
251278
}
252279

@@ -268,6 +295,19 @@ func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Opt
268295
}, nil
269296
}
270297

298+
// defaultHealthProbeListener creates the default health probes listener bound to the given address
299+
func defaultHealthProbeListener(addr string) (net.Listener, error) {
300+
if addr == "" || addr == "0" {
301+
return nil, nil
302+
}
303+
304+
ln, err := net.Listen("tcp", addr)
305+
if err != nil {
306+
return nil, fmt.Errorf("error listening on %s: %v", addr, err)
307+
}
308+
return ln, nil
309+
}
310+
271311
// setOptionsDefaults set default values for Options fields
272312
func setOptionsDefaults(options Options) Options {
273313
// Use the Kubernetes client-go scheme if none is specified
@@ -303,5 +343,9 @@ func setOptionsDefaults(options Options) Options {
303343
options.newMetricsListener = metrics.NewListener
304344
}
305345

346+
if options.newHealthProbeListener == nil {
347+
options.newHealthProbeListener = defaultHealthProbeListener
348+
}
349+
306350
return options
307351
}

pkg/manager/manager_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,42 @@ var _ = Describe("manger.Manager", func() {
197197

198198
Expect(ln.Close()).ToNot(HaveOccurred())
199199
})
200+
201+
It("should create a listener for the health probes if a valid address is provided", func() {
202+
var listener net.Listener
203+
m, err := New(cfg, Options{
204+
HealthProbeBindAddress: ":0",
205+
newHealthProbeListener: func(addr string) (net.Listener, error) {
206+
var err error
207+
listener, err = defaultHealthProbeListener(addr)
208+
return listener, err
209+
},
210+
})
211+
Expect(m).ToNot(BeNil())
212+
Expect(err).ToNot(HaveOccurred())
213+
Expect(listener).ToNot(BeNil())
214+
Expect(listener.Close()).ToNot(HaveOccurred())
215+
})
216+
217+
It("should return an error if the health probes bind address is already in use", func() {
218+
ln, err := defaultHealthProbeListener(":0")
219+
Expect(err).ShouldNot(HaveOccurred())
220+
221+
var listener net.Listener
222+
m, err := New(cfg, Options{
223+
HealthProbeBindAddress: ln.Addr().String(),
224+
newHealthProbeListener: func(addr string) (net.Listener, error) {
225+
var err error
226+
listener, err = defaultHealthProbeListener(addr)
227+
return listener, err
228+
},
229+
})
230+
Expect(m).To(BeNil())
231+
Expect(err).To(HaveOccurred())
232+
Expect(listener).To(BeNil())
233+
234+
Expect(ln.Close()).ToNot(HaveOccurred())
235+
})
200236
})
201237

202238
Describe("Start", func() {
@@ -426,6 +462,111 @@ var _ = Describe("manger.Manager", func() {
426462
})
427463
})
428464

465+
Context("should start serving health probes", func() {
466+
var listener net.Listener
467+
var opts Options
468+
469+
BeforeEach(func() {
470+
listener = nil
471+
opts = Options{
472+
newHealthProbeListener: func(addr string) (net.Listener, error) {
473+
var err error
474+
listener, err = defaultHealthProbeListener(addr)
475+
return listener, err
476+
},
477+
}
478+
})
479+
480+
AfterEach(func() {
481+
if listener != nil {
482+
listener.Close()
483+
}
484+
})
485+
486+
It("should stop serving health probes when stop is called", func(done Done) {
487+
opts.HealthProbeBindAddress = ":0"
488+
m, err := New(cfg, opts)
489+
Expect(err).NotTo(HaveOccurred())
490+
491+
s := make(chan struct{})
492+
go func() {
493+
defer GinkgoRecover()
494+
Expect(m.Start(s)).NotTo(HaveOccurred())
495+
close(done)
496+
}()
497+
498+
// Check the health probes started
499+
endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
500+
_, err = http.Get(endpoint)
501+
Expect(err).NotTo(HaveOccurred())
502+
503+
// Shutdown the server
504+
close(s)
505+
506+
// Expect the health probes server to shutdown
507+
Eventually(func() error {
508+
_, err = http.Get(endpoint)
509+
return err
510+
}).ShouldNot(Succeed())
511+
})
512+
513+
It("should serve readiness endpoint", func(done Done) {
514+
opts.HealthProbeBindAddress = ":0"
515+
opts.ReadinessEndpointName = "/ready"
516+
m, err := New(cfg, opts)
517+
Expect(err).NotTo(HaveOccurred())
518+
519+
s := make(chan struct{})
520+
defer close(s)
521+
go func() {
522+
defer GinkgoRecover()
523+
Expect(m.Start(s)).NotTo(HaveOccurred())
524+
close(done)
525+
}()
526+
527+
readinessEndpoint := fmt.Sprint("http://", listener.Addr().String(), opts.ReadinessEndpointName)
528+
529+
// Controller is not ready
530+
resp, err := http.Get(readinessEndpoint)
531+
Expect(err).NotTo(HaveOccurred())
532+
Expect(resp.StatusCode).To(Equal(503))
533+
534+
// Controller is ready
535+
m.SetReady(true)
536+
resp, err = http.Get(readinessEndpoint)
537+
Expect(err).NotTo(HaveOccurred())
538+
Expect(resp.StatusCode).To(Equal(200))
539+
})
540+
541+
It("should serve liveness endpoint", func(done Done) {
542+
opts.HealthProbeBindAddress = ":0"
543+
opts.LivenessEndpointName = "/health"
544+
m, err := New(cfg, opts)
545+
Expect(err).NotTo(HaveOccurred())
546+
547+
s := make(chan struct{})
548+
defer close(s)
549+
go func() {
550+
defer GinkgoRecover()
551+
Expect(m.Start(s)).NotTo(HaveOccurred())
552+
close(done)
553+
}()
554+
555+
readinessEndpoint := fmt.Sprint("http://", listener.Addr().String(), opts.LivenessEndpointName)
556+
557+
// Controller is not alive
558+
resp, err := http.Get(readinessEndpoint)
559+
Expect(err).NotTo(HaveOccurred())
560+
Expect(resp.StatusCode).To(Equal(500))
561+
562+
// Controller is alive
563+
m.SetAlive(true)
564+
resp, err = http.Get(readinessEndpoint)
565+
Expect(err).NotTo(HaveOccurred())
566+
Expect(resp.StatusCode).To(Equal(200))
567+
})
568+
})
569+
429570
Describe("Add", func() {
430571
It("should immediately start the Component if the Manager has already Started another Component",
431572
func(done Done) {

0 commit comments

Comments
 (0)