Skip to content

Commit 773aa12

Browse files
committed
Start http server for serving metrics
1 parent e28c13e commit 773aa12

File tree

2 files changed

+164
-0
lines changed

2 files changed

+164
-0
lines changed

pkg/manager/internal.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ limitations under the License.
1717
package manager
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"net"
23+
"net/http"
2224
"sync"
2325
"time"
2426

27+
"github.com/prometheus/client_golang/prometheus/promhttp"
2528
"k8s.io/apimachinery/pkg/api/meta"
2629
"k8s.io/apimachinery/pkg/runtime"
2730
"k8s.io/client-go/rest"
@@ -30,6 +33,7 @@ import (
3033
"k8s.io/client-go/tools/record"
3134
"sigs.k8s.io/controller-runtime/pkg/cache"
3235
"sigs.k8s.io/controller-runtime/pkg/client"
36+
"sigs.k8s.io/controller-runtime/pkg/metrics"
3337
"sigs.k8s.io/controller-runtime/pkg/recorder"
3438
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
3539
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
@@ -161,6 +165,32 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
161165
return cm.mapper
162166
}
163167

168+
func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
169+
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
170+
ErrorHandling: promhttp.HTTPErrorOnError,
171+
})
172+
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
173+
mux := http.NewServeMux()
174+
mux.Handle("/metrics", handler)
175+
server := http.Server{
176+
Handler: mux,
177+
}
178+
// Run the server
179+
go func() {
180+
if err := server.Serve(cm.metricsListener); err != nil {
181+
cm.errChan <- err
182+
}
183+
}()
184+
185+
// Shutdown the server when stop is closed
186+
select {
187+
case <-stop:
188+
if err := server.Shutdown(context.Background()); err != nil {
189+
cm.errChan <- err
190+
}
191+
}
192+
}
193+
164194
func (cm *controllerManager) Start(stop <-chan struct{}) error {
165195
if cm.resourceLock != nil {
166196
err := cm.startLeaderElection(stop)
@@ -197,6 +227,11 @@ func (cm *controllerManager) start(stop <-chan struct{}) {
197227
}
198228
}()
199229

230+
// Start the metrics server
231+
if cm.metricsListener != nil {
232+
go cm.serveMetrics(stop)
233+
}
234+
200235
// Wait for the caches to sync.
201236
// TODO(community): Check the return value and write a test
202237
cm.cache.WaitForCacheSync(stop)

pkg/manager/manager_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ package manager
1818

1919
import (
2020
"fmt"
21+
"io/ioutil"
2122
"net"
23+
"net/http"
2224

2325
"github.com/go-logr/logr"
2426
. "github.com/onsi/ginkgo"
2527
. "github.com/onsi/gomega"
28+
"github.com/prometheus/client_golang/prometheus"
2629
"k8s.io/apimachinery/pkg/api/meta"
2730
"k8s.io/apimachinery/pkg/runtime"
2831
"k8s.io/client-go/rest"
@@ -265,6 +268,132 @@ var _ = Describe("manger.Manager", func() {
265268
newResourceLock: fakeleaderelection.NewResourceLock,
266269
})
267270
})
271+
272+
Context("should start serving metrics", func() {
273+
var listener net.Listener
274+
var opts Options
275+
276+
BeforeEach(func() {
277+
listener = nil
278+
opts = Options{
279+
newMetricsListener: func(addr string) (net.Listener, error) {
280+
var err error
281+
listener, err = metrics.NewListener(addr)
282+
return listener, err
283+
},
284+
}
285+
})
286+
287+
AfterEach(func() {
288+
if listener != nil {
289+
listener.Close()
290+
}
291+
})
292+
293+
It("should stop serving metrics when stop is called", func(done Done) {
294+
opts.MetricsBindAddress = ":0"
295+
m, err := New(cfg, opts)
296+
Expect(err).NotTo(HaveOccurred())
297+
298+
s := make(chan struct{})
299+
go func() {
300+
defer GinkgoRecover()
301+
Expect(m.Start(s)).NotTo(HaveOccurred())
302+
close(done)
303+
}()
304+
305+
// Check the metrics started
306+
endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
307+
_, err = http.Get(endpoint)
308+
Expect(err).NotTo(HaveOccurred())
309+
310+
// Shutdown the server
311+
close(s)
312+
313+
// Expect the metrics server to shutdown
314+
Eventually(func() error {
315+
_, err = http.Get(endpoint)
316+
return err
317+
}).ShouldNot(Succeed())
318+
})
319+
320+
It("should serve metrics endpoint", func(done Done) {
321+
opts.MetricsBindAddress = ":0"
322+
m, err := New(cfg, opts)
323+
Expect(err).NotTo(HaveOccurred())
324+
325+
s := make(chan struct{})
326+
defer close(s)
327+
go func() {
328+
defer GinkgoRecover()
329+
Expect(m.Start(s)).NotTo(HaveOccurred())
330+
close(done)
331+
}()
332+
333+
metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String())
334+
resp, err := http.Get(metricsEndpoint)
335+
Expect(err).NotTo(HaveOccurred())
336+
Expect(resp.StatusCode).To(Equal(200))
337+
})
338+
339+
It("should not serve anything other than metrics endpoint", func(done Done) {
340+
opts.MetricsBindAddress = ":0"
341+
m, err := New(cfg, opts)
342+
Expect(err).NotTo(HaveOccurred())
343+
344+
s := make(chan struct{})
345+
defer close(s)
346+
go func() {
347+
defer GinkgoRecover()
348+
Expect(m.Start(s)).NotTo(HaveOccurred())
349+
close(done)
350+
}()
351+
352+
endpoint := fmt.Sprintf("http://%s/should-not-exist", listener.Addr().String())
353+
resp, err := http.Get(endpoint)
354+
Expect(err).NotTo(HaveOccurred())
355+
Expect(resp.StatusCode).To(Equal(404))
356+
})
357+
358+
It("should serve metrics in its registry", func(done Done) {
359+
one := prometheus.NewCounter(prometheus.CounterOpts{
360+
Name: "test_one",
361+
Help: "test metric for testing",
362+
})
363+
one.Set(1)
364+
err := metrics.Registry.Register(one)
365+
Expect(err).NotTo(HaveOccurred())
366+
367+
opts.MetricsBindAddress = ":0"
368+
m, err := New(cfg, opts)
369+
Expect(err).NotTo(HaveOccurred())
370+
371+
s := make(chan struct{})
372+
defer close(s)
373+
go func() {
374+
defer GinkgoRecover()
375+
Expect(m.Start(s)).NotTo(HaveOccurred())
376+
close(done)
377+
}()
378+
379+
metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String())
380+
resp, err := http.Get(metricsEndpoint)
381+
Expect(err).NotTo(HaveOccurred())
382+
Expect(resp.StatusCode).To(Equal(200))
383+
384+
data, err := ioutil.ReadAll(resp.Body)
385+
Expect(err).NotTo(HaveOccurred())
386+
Expect(string(data)).To(ContainSubstring("%s\n%s\n%s\n",
387+
`# HELP test_one test metric for testing`,
388+
`# TYPE test_one counter`,
389+
`test_one 1`,
390+
))
391+
392+
// Unregister will return false if the metric was never registered
393+
ok := metrics.Registry.Unregister(one)
394+
Expect(ok).To(BeTrue())
395+
})
396+
})
268397
})
269398

270399
Describe("Add", func() {

0 commit comments

Comments
 (0)