Skip to content

[node-labeler] Introduce workspace count controller #20509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions components/node-labeler/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
env:
- CGO_ENABLED=0
- GOOS=linux
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
config:
packaging: app
buildCommand: ["go", "build", "-trimpath", "-ldflags", "-buildid= -w -s -X 'github.com/gitpod-io/gitpod/node-labeler/cmd.Version=commit-${__git_commit}'"]
Expand All @@ -36,3 +41,4 @@ packages:
- "go.sum"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
2 changes: 1 addition & 1 deletion components/node-labeler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: ServiceName,
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Init(ServiceName, Version, jsonLog, verbose)
},
Expand Down
167 changes: 162 additions & 5 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/bombsimon/logrusr/v2"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -78,16 +80,16 @@ var runCmd = &cobra.Command{
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
log.WithError(err).Fatal("unable to start node-labeler")
}

client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
if err != nil {
log.WithError(err).Fatal("unable to create client")
}

r := &PodReconciler{
client,
kClient,
}

componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
Expand All @@ -110,6 +112,15 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to bind controller watch event handler")
}

wc, err := NewWorkspaceCountController(mgr.GetClient())
if err != nil {
log.WithError(err).Fatal("unable to create workspace count controller")
}
err = wc.SetupWithManager(mgr)
if err != nil {
log.WithError(err).Fatal("unable to bind workspace count controller")
}

metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)

Expand All @@ -123,10 +134,10 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to set up ready check")
}

log.Info("starting node-labeber")
log.Info("starting node-labeler")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
log.WithError(err).Fatal("problem running node-labeler")
}

log.Info("Received SIGINT - shutting down")
Expand All @@ -135,6 +146,8 @@ var runCmd = &cobra.Command{

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(workspacev1.AddToScheme(scheme))

rootCmd.AddCommand(runCmd)
}

Expand Down Expand Up @@ -249,6 +262,150 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, nil
}

type WorkspaceCountController struct {
client.Client
}

func NewWorkspaceCountController(client client.Client) (*WorkspaceCountController, error) {
return &WorkspaceCountController{
Client: client,
}, nil
}

func (wc *WorkspaceCountController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("workspace-count").
For(&workspacev1.Workspace{}).
WithEventFilter(workspaceFilter()).
Complete(wc)
}

func workspaceFilter() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
log.WithField("workspace", ws.Name).Info("workspace not ready yet")
return false
}

return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
UpdateFunc: func(e event.UpdateEvent) bool {
wsOld := e.ObjectOld.(*workspacev1.Workspace)
ws := e.ObjectNew.(*workspacev1.Workspace)
if wsOld.Status.Runtime == nil && ws.Status.Runtime != nil {
return true
}

// if we've seen runtime info before, there's no need to reconcile again
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
}
}

func (wc *WorkspaceCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")

var ws workspacev1.Workspace
if err := wc.Get(ctx, req.NamespacedName, &ws); err != nil {
if !errors.IsNotFound(err) {
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
return ctrl.Result{}, err
}
// If workspace not found, do a full reconciliation
log.WithField("workspace", req.NamespacedName).Info("Workspace not found, reconciling all nodes")
return wc.reconcileAllNodes(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking:
Is return wc.reconcileAllNodes(ctx) necessary?

I ask because I would expect this controller to evaluate all workspaces every 2m (the sync period for the controller's manager).

It seems potentially unnecessary to me. Can you run a test, where the none of the events trigger (return false), and then check to see if the reconcile function triggers in 2m?

Copy link
Member Author

@filiptronicek filiptronicek Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going off of @iQQBot's comment here, where he mentioned listing nodes and workspaces often shouldn't be an issue.

To answer your question: because we reconcile at least every 2m, this reconcileAllNodes call is not strictly necessary, it just makes us faster to respond to changes - i.e. if we roll out this change without it, it will add up to 2 extra minutes of keeping every node around that just lost its last workspace pod.

... and it also helps with testing 😄

}

if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
var workspaceList workspacev1.WorkspaceList
if err := wc.List(ctx, &workspaceList); err != nil {
log.WithError(err).Error("failed to list workspaces")
return ctrl.Result{}, err
}

count := 0
nodeName := ws.Status.Runtime.NodeName
for _, ws := range workspaceList.Items {
if ws.Status.Runtime != nil &&
ws.Status.Runtime.NodeName == nodeName &&
ws.DeletionTimestamp.IsZero() {
count++
}
}

if err := wc.updateNodeAnnotation(ctx, nodeName, count); err != nil {
return ctrl.Result{}, err
}
log.WithField("node", nodeName).WithField("count", count).Info("updated node annotation")
}

return ctrl.Result{}, nil
}

func (wc *WorkspaceCountController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
var workspaceList workspacev1.WorkspaceList
if err := wc.List(ctx, &workspaceList); err != nil {
log.WithError(err).Error("failed to list workspaces")
return ctrl.Result{}, err
}

workspaceCounts := make(map[string]int)
for _, ws := range workspaceList.Items {
if ws.Status.Runtime != nil &&
ws.Status.Runtime.NodeName != "" &&
ws.DeletionTimestamp.IsZero() {
workspaceCounts[ws.Status.Runtime.NodeName]++
}
}

var nodes corev1.NodeList
if err := wc.List(ctx, &nodes); err != nil {
log.WithError(err).Error("failed to list nodes")
return ctrl.Result{}, err
}

for _, node := range nodes.Items {
count := workspaceCounts[node.Name]
if err := wc.updateNodeAnnotation(ctx, node.Name, count); err != nil {
log.WithError(err).WithField("node", node.Name).Error("failed to update node")
continue
}
log.WithField("node", node.Name).WithField("count", count).Info("updated node annotation")
}

return ctrl.Result{}, nil
}

func (wc *WorkspaceCountController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var node corev1.Node
err := wc.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
}

if node.Annotations == nil {
node.Annotations = make(map[string]string)
}

if count > 0 {
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
} else {
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
}

return wc.Update(ctx, &node)
})
}

func updateLabel(label string, add bool, nodeName string, client client.Client) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Loading
Loading