@@ -19,6 +19,8 @@ import (
19
19
corev1 "k8s.io/api/core/v1"
20
20
"k8s.io/apimachinery/pkg/api/errors"
21
21
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22
+ "k8s.io/apimachinery/pkg/fields"
23
+ "k8s.io/apimachinery/pkg/labels"
22
24
"k8s.io/apimachinery/pkg/runtime"
23
25
"k8s.io/apimachinery/pkg/types"
24
26
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -31,6 +33,7 @@ import (
31
33
"sigs.k8s.io/controller-runtime/pkg/cache"
32
34
"sigs.k8s.io/controller-runtime/pkg/client"
33
35
"sigs.k8s.io/controller-runtime/pkg/controller"
36
+ "sigs.k8s.io/controller-runtime/pkg/event"
34
37
"sigs.k8s.io/controller-runtime/pkg/healthz"
35
38
"sigs.k8s.io/controller-runtime/pkg/metrics"
36
39
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -81,13 +84,13 @@ var runCmd = &cobra.Command{
81
84
log .WithError (err ).Fatal ("unable to start node-labeler" )
82
85
}
83
86
84
- client , err := client .New (ctrl .GetConfigOrDie (), client.Options {})
87
+ kClient , err := client .New (ctrl .GetConfigOrDie (), client.Options {})
85
88
if err != nil {
86
89
log .WithError (err ).Fatal ("unable to create client" )
87
90
}
88
91
89
92
r := & PodReconciler {
90
- client ,
93
+ kClient ,
91
94
}
92
95
93
96
componentPredicate , err := predicate .LabelSelectorPredicate (metav1.LabelSelector {
@@ -110,6 +113,27 @@ var runCmd = &cobra.Command{
110
113
log .WithError (err ).Fatal ("unable to bind controller watch event handler" )
111
114
}
112
115
116
+ // the pod count reconciler needs an index on spec.nodeName to be able to list pods by node
117
+ if err := mgr .GetFieldIndexer ().IndexField (
118
+ context .Background (),
119
+ & corev1.Pod {},
120
+ "spec.nodeName" ,
121
+ func (o client.Object ) []string {
122
+ pod := o .(* corev1.Pod )
123
+ return []string {pod .Spec .NodeName }
124
+ }); err != nil {
125
+ log .WithError (err ).Fatal ("unable to create index for pod nodeName" )
126
+ }
127
+
128
+ pc , err := NewPodCountController (mgr .GetClient ())
129
+ if err != nil {
130
+ log .WithError (err ).Fatal ("unable to create pod count controller" )
131
+ }
132
+ err = pc .SetupWithManager (mgr )
133
+ if err != nil {
134
+ log .WithError (err ).Fatal ("unable to bind pod count controller" )
135
+ }
136
+
113
137
metrics .Registry .MustRegister (NodeLabelerCounterVec )
114
138
metrics .Registry .MustRegister (NodeLabelerTimeHistVec )
115
139
@@ -135,6 +159,7 @@ var runCmd = &cobra.Command{
135
159
136
160
func init () {
137
161
utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
162
+
138
163
rootCmd .AddCommand (runCmd )
139
164
}
140
165
@@ -249,6 +274,127 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
249
274
return reconcile.Result {}, nil
250
275
}
251
276
277
+ type PodCountController struct {
278
+ client.Client
279
+ }
280
+
281
+ // NewPodCountController creates a controller that tracks workspace pod counts and updates node annotations
282
+ func NewPodCountController (client client.Client ) (* PodCountController , error ) {
283
+ return & PodCountController {
284
+ Client : client ,
285
+ }, nil
286
+ }
287
+
288
+ func (pc * PodCountController ) SetupWithManager (mgr ctrl.Manager ) error {
289
+ return ctrl .NewControllerManagedBy (mgr ).
290
+ Named ("pod-count" ).
291
+ For (& corev1.Pod {}).
292
+ WithEventFilter (workspacePodFilter ()).
293
+ Complete (pc )
294
+ }
295
+
296
+ func workspacePodFilter () predicate.Predicate {
297
+ return predicate.Funcs {
298
+ CreateFunc : func (e event.CreateEvent ) bool {
299
+ pod := e .Object .(* corev1.Pod )
300
+ return pod .Labels ["component" ] == "workspace"
301
+ },
302
+ UpdateFunc : func (e event.UpdateEvent ) bool {
303
+ return false
304
+ },
305
+ DeleteFunc : func (e event.DeleteEvent ) bool {
306
+ pod := e .Object .(* corev1.Pod )
307
+ return pod .Labels ["component" ] == "workspace"
308
+ },
309
+ }
310
+ }
311
+
312
+ func (pc * PodCountController ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
313
+ log .WithField ("request" , req .NamespacedName .String ()).Info ("PodCountController reconciling" )
314
+
315
+ var pod corev1.Pod
316
+ if err := pc .Get (ctx , req .NamespacedName , & pod ); err != nil {
317
+ if ! errors .IsNotFound (err ) {
318
+ log .WithError (err ).WithField ("pod" , req .NamespacedName ).Error ("unable to fetch Pod" )
319
+ return ctrl.Result {}, err
320
+ }
321
+
322
+ log .WithField ("pod" , req .NamespacedName ).Info ("Pod not found, assuming it was deleted, reconciling all nodes" )
323
+
324
+ // Pod was deleted, reconcile all nodes
325
+ return pc .reconcileAllNodes (ctx )
326
+ }
327
+
328
+ if pod .Spec .NodeName == "" {
329
+ log .WithField ("pod" , req .NamespacedName ).Info ("Pod has no node, requesting reconciliation" )
330
+ return ctrl.Result {RequeueAfter : 5 * time .Second }, nil
331
+ }
332
+
333
+ return pc .reconcileNode (ctx , pod .Spec .NodeName )
334
+ }
335
+
336
+ func (pc * PodCountController ) reconcileAllNodes (ctx context.Context ) (ctrl.Result , error ) {
337
+ var nodes corev1.NodeList
338
+ if err := pc .List (ctx , & nodes ); err != nil {
339
+ log .WithError (err ).Error ("failed to list nodes" )
340
+ return ctrl.Result {}, err
341
+ }
342
+
343
+ for _ , node := range nodes .Items {
344
+ if _ , err := pc .reconcileNode (ctx , node .Name ); err != nil {
345
+ log .WithError (err ).WithField ("node" , node .Name ).Error ("failed to reconcile node" )
346
+ // Continue with other nodes even if one fails
347
+ continue
348
+ }
349
+ log .WithField ("node" , node .Name ).Info ("reconciled node" )
350
+ }
351
+
352
+ return ctrl.Result {}, nil
353
+ }
354
+
355
+ func (pc * PodCountController ) reconcileNode (ctx context.Context , nodeName string ) (ctrl.Result , error ) {
356
+ var podList corev1.PodList
357
+ err := pc .List (ctx , & podList , & client.ListOptions {
358
+ FieldSelector : fields .SelectorFromSet (fields.Set {"spec.nodeName" : nodeName }),
359
+ LabelSelector : labels .SelectorFromSet (labels.Set {"component" : "workspace" }),
360
+ })
361
+ if err != nil {
362
+ log .WithError (err ).WithField ("nodeName" , nodeName ).Error ("failed to list pods" )
363
+ return ctrl.Result {}, fmt .Errorf ("failed to list pods: %w" , err )
364
+ }
365
+
366
+ workspaceCount := len (podList .Items )
367
+ log .WithField ("nodeName" , nodeName ).WithField ("workspaceCount" , workspaceCount ).Info ("reconciling node" )
368
+
369
+ err = retry .RetryOnConflict (retry .DefaultBackoff , func () error {
370
+ var node corev1.Node
371
+ err := pc .Get (ctx , types.NamespacedName {Name : nodeName }, & node )
372
+ if err != nil {
373
+ return fmt .Errorf ("obtaining node %s: %w" , nodeName , err )
374
+ }
375
+
376
+ if node .Annotations == nil {
377
+ node .Annotations = make (map [string ]string )
378
+ }
379
+
380
+ if workspaceCount > 0 {
381
+ node .Annotations ["cluster-autoscaler.kubernetes.io/scale-down-disabled" ] = "true"
382
+ log .WithField ("nodeName" , nodeName ).Info ("disabling scale-down for node" )
383
+ } else {
384
+ delete (node .Annotations , "cluster-autoscaler.kubernetes.io/scale-down-disabled" )
385
+ log .WithField ("nodeName" , nodeName ).Info ("enabling scale-down for node" )
386
+ }
387
+
388
+ return pc .Update (ctx , & node )
389
+ })
390
+ if err != nil {
391
+ log .WithError (err ).WithField ("nodeName" , nodeName ).Error ("failed to update node" )
392
+ return ctrl.Result {}, fmt .Errorf ("failed to update node: %w" , err )
393
+ }
394
+
395
+ return ctrl.Result {}, nil
396
+ }
397
+
252
398
func updateLabel (label string , add bool , nodeName string , client client.Client ) error {
253
399
return retry .RetryOnConflict (retry .DefaultBackoff , func () error {
254
400
ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
0 commit comments