Skip to content

Commit 583f54e

Browse files
committed
[ws-daemon] Introduce pod count controller
1 parent d54bd04 commit 583f54e

File tree

4 files changed

+103
-4
lines changed

4 files changed

+103
-4
lines changed

components/node-labeler/cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var (
3232
// rootCmd represents the base command when called without any subcommands
3333
var rootCmd = &cobra.Command{
3434
Use: ServiceName,
35-
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
35+
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
3636
PersistentPreRun: func(cmd *cobra.Command, args []string) {
3737
log.Init(ServiceName, Version, jsonLog, verbose)
3838
},

components/node-labeler/cmd/run.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const (
4747

4848
registryFacade = "registry-facade"
4949
wsDaemon = "ws-daemon"
50+
workspace = "workspace"
5051
)
5152

5253
var defaultRequeueTime = time.Second * 10
@@ -78,7 +79,7 @@ var runCmd = &cobra.Command{
7879
LeaderElectionID: "node-labeler.gitpod.io",
7980
})
8081
if err != nil {
81-
log.WithError(err).Fatal("unable to start node-labeber")
82+
log.WithError(err).Fatal("unable to start node-labeler")
8283
}
8384

8485
client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
@@ -123,10 +124,10 @@ var runCmd = &cobra.Command{
123124
log.WithError(err).Fatal("unable to set up ready check")
124125
}
125126

126-
log.Info("starting node-labeber")
127+
log.Info("starting node-labeler")
127128
err = mgr.Start(ctrl.SetupSignalHandler())
128129
if err != nil {
129-
log.WithError(err).Fatal("problem running node-labeber")
130+
log.WithError(err).Fatal("problem running node-labeler")
130131
}
131132

132133
log.Info("Received SIGINT - shutting down")

components/ws-daemon/pkg/controller/workspace_controller.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
corev1 "k8s.io/api/core/v1"
2727
"k8s.io/apimachinery/pkg/api/errors"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/fields"
30+
"k8s.io/apimachinery/pkg/labels"
2931
"k8s.io/apimachinery/pkg/types"
3032
"k8s.io/apimachinery/pkg/util/wait"
3133
"k8s.io/client-go/tools/record"
@@ -81,6 +83,41 @@ func NewWorkspaceController(c client.Client, recorder record.EventRecorder, node
8183
}, nil
8284
}
8385

86+
type PodCountController struct {
87+
client.Client
88+
NodeName string
89+
}
90+
91+
// NewPodCountController creates a controller that tracks workspace pod counts and updates node annotations
92+
func NewPodCountController(client client.Client, nodeName string) (*PodCountController, error) {
93+
return &PodCountController{
94+
Client: client,
95+
NodeName: nodeName,
96+
}, nil
97+
}
98+
99+
func (pc *PodCountController) SetupWithManager(mgr ctrl.Manager) error {
100+
return ctrl.NewControllerManagedBy(mgr).
101+
Named("pod-count").
102+
For(&workspacev1.Workspace{}).
103+
WithEventFilter(podEventFilter(pc.NodeName)).
104+
Complete(pc)
105+
}
106+
107+
func podEventFilter(nodeName string) predicate.Predicate {
108+
return predicate.Funcs{
109+
CreateFunc: func(e event.CreateEvent) bool {
110+
return workspaceFilter(e.Object, nodeName)
111+
},
112+
UpdateFunc: func(e event.UpdateEvent) bool {
113+
return workspaceFilter(e.ObjectNew, nodeName)
114+
},
115+
DeleteFunc: func(e event.DeleteEvent) bool {
116+
return workspaceFilter(e.Object, nodeName)
117+
},
118+
}
119+
}
120+
84121
// SetupWithManager sets up the controller with the Manager.
85122
func (wsc *WorkspaceController) SetupWithManager(mgr ctrl.Manager) error {
86123
return ctrl.NewControllerManagedBy(mgr).
@@ -146,6 +183,45 @@ func (wsc *WorkspaceController) Reconcile(ctx context.Context, req ctrl.Request)
146183
return ctrl.Result{}, nil
147184
}
148185

186+
func (pc *PodCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
187+
var podList corev1.PodList
188+
err := pc.List(ctx, &podList, &client.ListOptions{
189+
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": pc.NodeName}),
190+
LabelSelector: labels.SelectorFromSet(labels.Set{"component": "workspace"}),
191+
})
192+
if err != nil {
193+
glog.WithError(err).WithField("nodeName", pc.NodeName).Error("failed to list pods")
194+
return ctrl.Result{}, err
195+
}
196+
workspaceCount := len(podList.Items)
197+
198+
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
199+
var node corev1.Node
200+
err := pc.Get(ctx, types.NamespacedName{Name: pc.NodeName}, &node)
201+
if err != nil {
202+
return fmt.Errorf("obtaining node %s: %w", pc.NodeName, err)
203+
}
204+
205+
if node.Annotations == nil {
206+
node.Annotations = make(map[string]string)
207+
}
208+
209+
if workspaceCount > 0 {
210+
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
211+
} else {
212+
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
213+
}
214+
215+
return pc.Update(ctx, &node)
216+
})
217+
if err != nil {
218+
glog.WithError(err).WithField("nodeName", pc.NodeName).Error("[failed to update node")
219+
return ctrl.Result{}, err
220+
}
221+
222+
return ctrl.Result{}, nil
223+
}
224+
149225
// latestWorkspace checks if the we have the latest generation of the workspace CR. We do this because
150226
// the cache could be stale and we retrieve a workspace CR that does not have the content init/backup
151227
// conditions even though we have set them previously. This will lead to us performing these operations

components/ws-daemon/pkg/daemon/daemon.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/client_golang/prometheus"
1414
"github.com/prometheus/client_golang/prometheus/collectors"
1515
"golang.org/x/xerrors"
16+
corev1 "k8s.io/api/core/v1"
1617
"k8s.io/apimachinery/pkg/runtime"
1718
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1819
"k8s.io/client-go/kubernetes"
@@ -21,6 +22,7 @@ import (
2122
"k8s.io/client-go/tools/clientcmd"
2223
ctrl "sigs.k8s.io/controller-runtime"
2324
"sigs.k8s.io/controller-runtime/pkg/cache"
25+
"sigs.k8s.io/controller-runtime/pkg/client"
2426
"sigs.k8s.io/controller-runtime/pkg/manager"
2527
"sigs.k8s.io/controller-runtime/pkg/metrics"
2628
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -228,6 +230,26 @@ func NewDaemon(config Config) (*Daemon, error) {
228230
return nil, err
229231
}
230232

233+
// the pod count reconciler needs an index on spec.nodeName to be able to list pods by node
234+
if err := mgr.GetFieldIndexer().IndexField(
235+
context.Background(),
236+
&corev1.Pod{},
237+
"spec.nodeName",
238+
func(o client.Object) []string {
239+
pod := o.(*corev1.Pod)
240+
return []string{pod.Spec.NodeName}
241+
}); err != nil {
242+
return nil, err
243+
}
244+
245+
pcc, err := controller.NewPodCountController(mgr.GetClient(), nodename)
246+
if err != nil {
247+
return nil, err
248+
}
249+
if err := pcc.SetupWithManager(mgr); err != nil {
250+
return nil, err
251+
}
252+
231253
ssctrl := controller.NewSnapshotController(
232254
mgr.GetClient(), mgr.GetEventRecorderFor("snapshot"), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps)
233255
err = ssctrl.SetupWithManager(mgr)

0 commit comments

Comments
 (0)