Skip to content

Commit 029e8ac

Browse files
committed
[node-labeler] Refactor node reconciliation and pod health checks
1 parent eec4d3b commit 029e8ac

File tree

1 file changed

+174
-93
lines changed
  • components/node-labeler/cmd

1 file changed

+174
-93
lines changed

components/node-labeler/cmd/run.go

Lines changed: 174 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2727
_ "k8s.io/client-go/plugin/pkg/client/auth"
2828
"k8s.io/client-go/util/retry"
29-
"k8s.io/utils/pointer"
29+
"k8s.io/utils/ptr"
3030
ctrl "sigs.k8s.io/controller-runtime"
3131
"sigs.k8s.io/controller-runtime/pkg/builder"
3232
"sigs.k8s.io/controller-runtime/pkg/cache"
@@ -35,7 +35,6 @@ import (
3535
"sigs.k8s.io/controller-runtime/pkg/event"
3636
"sigs.k8s.io/controller-runtime/pkg/healthz"
3737
"sigs.k8s.io/controller-runtime/pkg/manager"
38-
"sigs.k8s.io/controller-runtime/pkg/metrics"
3938
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4039
"sigs.k8s.io/controller-runtime/pkg/predicate"
4140
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -68,15 +67,6 @@ var runCmd = &cobra.Command{
6867
Run: func(cmd *cobra.Command, args []string) {
6968
ctrl.SetLogger(logrusr.New(log.Log))
7069

71-
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
72-
if err != nil {
73-
log.WithError(err).Fatal("unable to create client")
74-
}
75-
76-
if err := initializeLabels(context.Background(), kClient); err != nil {
77-
log.WithError(err).Fatal("failed to initialize labels")
78-
}
79-
8070
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
8171
Scheme: scheme,
8272
HealthProbeBindAddress: ":8086",
@@ -88,7 +78,7 @@ var runCmd = &cobra.Command{
8878
// default sync period is 10h.
8979
// in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
9080
// that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
91-
SyncPeriod: pointer.Duration(2 * time.Minute),
81+
SyncPeriod: ptr.To(time.Duration(2 * time.Minute)),
9282
},
9383
WebhookServer: webhook.NewServer(webhook.Options{
9484
Port: 9443,
@@ -101,7 +91,7 @@ var runCmd = &cobra.Command{
10191
}
10292

10393
r := &PodReconciler{
104-
kClient,
94+
mgr.GetClient(),
10595
}
10696

10797
componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
@@ -123,6 +113,25 @@ var runCmd = &cobra.Command{
123113
if err != nil {
124114
log.WithError(err).Fatal("unable to bind controller watch event handler")
125115
}
116+
nr := &NodeReconciler{
117+
mgr.GetClient(),
118+
}
119+
120+
err = ctrl.NewControllerManagedBy(mgr).
121+
Named("node-watcher").
122+
For(&corev1.Node{}, builder.WithPredicates(predicate.Or(nr.nodeFilter()))).
123+
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
124+
Complete(nr)
125+
if err != nil {
126+
log.WithError(err).Fatal("unable to bind controller watch event handler")
127+
}
128+
129+
go func() {
130+
<-mgr.Elected()
131+
if err := nr.reconcileAll(context.Background()); err != nil {
132+
log.WithError(err).Fatal("failed to reconcile all nodes")
133+
}
134+
}()
126135

127136
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &workspacev1.Workspace{}, "status.runtime.nodeName", func(o client.Object) []string {
128137
ws := o.(*workspacev1.Workspace)
@@ -135,6 +144,17 @@ var runCmd = &cobra.Command{
135144
return
136145
}
137146

147+
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
148+
pod := o.(*corev1.Pod)
149+
if pod.Spec.NodeName == "" {
150+
return nil
151+
}
152+
return []string{pod.Spec.NodeName}
153+
}); err != nil {
154+
log.WithError(err).Fatal("unable to create pod indexer")
155+
return
156+
}
157+
138158
nsac, err := NewNodeScaledownAnnotationController(mgr.GetClient())
139159
if err != nil {
140160
log.WithError(err).Fatal("unable to create node scaledown annotation controller")
@@ -153,10 +173,6 @@ var runCmd = &cobra.Command{
153173
if err != nil {
154174
log.WithError(err).Fatal("couldn't properly clean up node scaledown annotation controller")
155175
}
156-
157-
metrics.Registry.MustRegister(NodeLabelerCounterVec)
158-
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)
159-
160176
err = mgr.AddHealthzCheck("healthz", healthz.Ping)
161177
if err != nil {
162178
log.WithError(err).Fatal("unable to set up health check")
@@ -208,96 +224,170 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
208224
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
209225
}
210226

227+
var taintKey string
228+
switch {
229+
case strings.HasPrefix(pod.Name, registryFacade):
230+
taintKey = registryFacadeTaintKey
231+
case strings.HasPrefix(pod.Name, wsDaemon):
232+
taintKey = wsDaemonTaintKey
233+
default:
234+
// nothing to do
235+
return reconcile.Result{}, nil
236+
}
237+
238+
healthy, err := checkPodHealth(pod)
239+
if err != nil {
240+
log.WithError(err).Error("checking pod health")
241+
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
242+
}
243+
244+
var node corev1.Node
245+
err = r.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
246+
if err != nil {
247+
return reconcile.Result{}, fmt.Errorf("obtaining node %s: %w", nodeName, err)
248+
}
249+
250+
if isNodeTaintExists(taintKey, node) != healthy {
251+
// nothing to do, the taint already exists and is in the desired state.
252+
return reconcile.Result{}, nil
253+
}
254+
255+
err = updateNodeTaint(taintKey, !healthy, nodeName, r)
256+
if err != nil {
257+
log.WithError(err).Error("updating node taint")
258+
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
259+
}
260+
261+
return reconcile.Result{}, nil
262+
}
263+
264+
func checkPodHealth(pod corev1.Pod) (bool, error) {
211265
var (
212266
ipAddress string
213267
port string
214-
taintKey string
215268
)
216-
217269
switch {
218270
case strings.HasPrefix(pod.Name, registryFacade):
219-
taintKey = registryFacadeTaintKey
220271
ipAddress = pod.Status.HostIP
221272
port = strconv.Itoa(registryFacadePort)
222273
case strings.HasPrefix(pod.Name, wsDaemon):
223-
taintKey = wsDaemonTaintKey
224274
ipAddress = pod.Status.PodIP
225275
port = strconv.Itoa(wsdaemonPort)
226276
default:
227277
// nothing to do
228-
return reconcile.Result{}, nil
278+
return true, nil
229279
}
230280

231281
if !pod.ObjectMeta.DeletionTimestamp.IsZero() {
232282
// the pod is being removed.
233283
// add the taint to the node
234-
time.Sleep(1 * time.Second)
235-
err := updateNodeTaint(taintKey, true, nodeName, r)
236-
if err != nil {
237-
// this is a edge case when cluster-autoscaler removes a node
238-
// (all the running pods will be removed after that)
239-
if errors.IsNotFound(err) {
240-
return reconcile.Result{}, nil
241-
}
242-
243-
log.WithError(err).Error("adding node taint")
244-
return reconcile.Result{RequeueAfter: defaultRequeueTime}, err
245-
}
246-
247-
return reconcile.Result{}, err
284+
return false, nil
248285
}
249286

250287
if !IsPodReady(pod) {
251288
// not ready. Wait until the next update.
252-
return reconcile.Result{}, nil
289+
return false, nil
253290
}
254291

255-
var node corev1.Node
256-
err = r.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
292+
err := checkTCPPortIsReachable(ipAddress, port)
257293
if err != nil {
258-
return reconcile.Result{}, fmt.Errorf("obtaining node %s: %w", nodeName, err)
294+
log.WithField("host", ipAddress).WithField("port", port).WithField("pod", pod.Name).WithError(err).Error("checking if TCP port is open")
295+
return false, nil
259296
}
260297

261-
// Check if taint exists
262-
taintExists := false
263-
for _, taint := range node.Spec.Taints {
264-
if taint.Key == taintKey {
265-
taintExists = true
266-
break
298+
if strings.HasPrefix(pod.Name, registryFacade) {
299+
err = checkRegistryFacade(ipAddress, port)
300+
if err != nil {
301+
log.WithError(err).Error("checking registry-facade")
302+
return false, nil
267303
}
268304
}
269305

270-
if !taintExists {
271-
// nothing to do, the taint doesn't exist.
272-
return reconcile.Result{}, nil
306+
return true, nil
307+
}
308+
309+
type NodeReconciler struct {
310+
client.Client
311+
}
312+
313+
func (r *NodeReconciler) nodeFilter() predicate.Predicate {
314+
return predicate.Funcs{
315+
CreateFunc: func(e event.CreateEvent) bool {
316+
node, ok := e.Object.(*corev1.Node)
317+
if !ok {
318+
return false
319+
}
320+
return isWorkspaceNode(*node)
321+
},
322+
UpdateFunc: func(e event.UpdateEvent) bool {
323+
return false
324+
},
325+
DeleteFunc: func(e event.DeleteEvent) bool {
326+
return false
327+
},
273328
}
329+
}
274330

275-
err = checkTCPPortIsReachable(ipAddress, port)
276-
if err != nil {
277-
log.WithField("host", ipAddress).WithField("port", port).WithField("pod", pod.Name).WithError(err).Error("checking if TCP port is open")
278-
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
331+
func (r *NodeReconciler) reconcileAll(ctx context.Context) error {
332+
log.Info("start reconciling all nodes")
333+
334+
var nodes corev1.NodeList
335+
if err := r.List(ctx, &nodes); err != nil {
336+
return fmt.Errorf("failed to list nodes: %w", err)
279337
}
280338

281-
if strings.HasPrefix(pod.Name, registryFacade) {
282-
err = checkRegistryFacade(ipAddress, port)
283-
if err != nil {
284-
log.WithError(err).Error("checking registry-facade")
285-
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
339+
for _, node := range nodes.Items {
340+
if node.Labels == nil {
341+
continue
342+
}
343+
if !isWorkspaceNode(node) {
344+
continue
286345
}
287346

288-
time.Sleep(1 * time.Second)
347+
err := updateNodeLabel(node.Name, r.Client)
348+
if err != nil {
349+
log.WithError(err).WithField("node", node.Name).Error("failed to initialize labels on node")
350+
}
351+
r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}})
289352
}
290353

291-
err = updateNodeTaint(taintKey, false, nodeName, r)
354+
log.Info("finished reconciling all nodes")
355+
return nil
356+
}
357+
358+
func (r *NodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
359+
var node corev1.Node
360+
err := r.Get(ctx, req.NamespacedName, &node)
292361
if err != nil {
293-
log.WithError(err).Error("removing node taint")
294-
return reconcile.Result{}, fmt.Errorf("trying to remove the taint: %v", err)
362+
if !errors.IsNotFound(err) {
363+
log.WithError(err).Error("unable to fetch node")
364+
}
365+
return ctrl.Result{}, client.IgnoreNotFound(err)
366+
}
367+
var podList corev1.PodList
368+
err = r.List(ctx, &podList, client.MatchingFields{
369+
"spec.nodeName": node.Name,
370+
})
371+
if err != nil {
372+
return reconcile.Result{}, fmt.Errorf("listing pods: %w", err)
373+
}
374+
isWsdaemonTaintExists := isNodeTaintExists(wsDaemonTaintKey, node)
375+
isRegistryFacadeTaintExists := isNodeTaintExists(registryFacadeTaintKey, node)
376+
isWsDaemonReady, isRegistryFacadeReady := false, false
377+
for _, pod := range podList.Items {
378+
if strings.HasPrefix(pod.Name, wsDaemon) {
379+
isWsDaemonReady = IsPodReady(pod)
380+
}
381+
if strings.HasPrefix(pod.Name, registryFacade) {
382+
isRegistryFacadeReady = IsPodReady(pod)
383+
}
384+
}
385+
if isWsDaemonReady == isWsdaemonTaintExists {
386+
updateNodeTaint(wsDaemonTaintKey, isWsDaemonReady, node.Name, r)
387+
}
388+
if isRegistryFacadeReady == isRegistryFacadeTaintExists {
389+
updateNodeTaint(registryFacadeTaintKey, isRegistryFacadeReady, node.Name, r)
295390
}
296-
297-
readyIn := time.Since(pod.Status.StartTime.Time)
298-
NodeLabelerTimeHistVec.WithLabelValues(strings.Split(pod.Name, "-")[0]).Observe(readyIn.Seconds())
299-
NodeLabelerCounterVec.WithLabelValues(strings.Split(pod.Name, "-")[0]).Inc()
300-
301391
return reconcile.Result{}, nil
302392
}
303393

@@ -510,7 +600,10 @@ func updateNodeTaint(taintKey string, add bool, nodeName string, client client.C
510600
var node corev1.Node
511601
err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
512602
if err != nil {
513-
return err
603+
if !errors.IsNotFound(err) {
604+
return err
605+
}
606+
return nil
514607
}
515608

516609
// Create or remove taint
@@ -554,6 +647,15 @@ func updateNodeTaint(taintKey string, add bool, nodeName string, client client.C
554647
})
555648
}
556649

650+
func isNodeTaintExists(taintKey string, node corev1.Node) bool {
651+
for _, taint := range node.Spec.Taints {
652+
if taint.Key == taintKey {
653+
return true
654+
}
655+
}
656+
return false
657+
}
658+
557659
func checkTCPPortIsReachable(host string, port string) error {
558660
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 1*time.Second)
559661
if err != nil {
@@ -611,31 +713,10 @@ func newDefaultTransport() *http.Transport {
611713
}
612714
}
613715

614-
func initializeLabels(ctx context.Context, kClient client.Client) error {
615-
log.Info("initializing labels on nodes")
616-
617-
var nodes corev1.NodeList
618-
if err := kClient.List(ctx, &nodes); err != nil {
619-
return fmt.Errorf("failed to list nodes: %w", err)
620-
}
621-
622-
for _, node := range nodes.Items {
623-
if node.Labels == nil {
624-
continue
625-
}
626-
_, isRegularWorkspaceNode := node.Labels[workspacesRegularLabel]
627-
_, isHeadlessWorkspaceNode := node.Labels[workspacesHeadlessLabel]
628-
629-
if isRegularWorkspaceNode || isHeadlessWorkspaceNode {
630-
err := updateNodeLabel(node.Name, kClient)
631-
if err != nil {
632-
log.WithError(err).WithField("node", node.Name).Error("failed to initialize labels on node")
633-
}
634-
}
635-
}
636-
637-
log.Info("finished initializing labels on nodes")
638-
return nil
716+
func isWorkspaceNode(node corev1.Node) bool {
717+
_, isRegularWorkspaceNode := node.Labels[workspacesRegularLabel]
718+
_, isHeadlessWorkspaceNode := node.Labels[workspacesHeadlessLabel]
719+
return isRegularWorkspaceNode || isHeadlessWorkspaceNode
639720
}
640721

641722
func updateNodeLabel(nodeName string, client client.Client) error {

0 commit comments

Comments
 (0)