-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[node-labeler] Introduce workspace count controller #20509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
583f54e
2806157
e76b62e
cf4e38f
c5fc9bd
77213b7
11d73d5
1942721
bd36bac
b3b8dc9
f043f94
56a4333
06abb06
4120599
9a097d8
24ca89e
f333539
0405fcd
eb3a811
1584a52
4e3401b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
filiptronicek marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
"time" | ||
|
||
"github.com/bombsimon/logrusr/v2" | ||
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1" | ||
"github.com/spf13/cobra" | ||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
|
@@ -31,6 +32,7 @@ import ( | |
"sigs.k8s.io/controller-runtime/pkg/cache" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/event" | ||
"sigs.k8s.io/controller-runtime/pkg/healthz" | ||
"sigs.k8s.io/controller-runtime/pkg/metrics" | ||
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" | ||
|
@@ -78,16 +80,16 @@ var runCmd = &cobra.Command{ | |
LeaderElectionID: "node-labeler.gitpod.io", | ||
}) | ||
if err != nil { | ||
log.WithError(err).Fatal("unable to start node-labeber") | ||
log.WithError(err).Fatal("unable to start node-labeler") | ||
} | ||
|
||
client, err := client.New(ctrl.GetConfigOrDie(), client.Options{}) | ||
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{}) | ||
if err != nil { | ||
log.WithError(err).Fatal("unable to create client") | ||
} | ||
|
||
r := &PodReconciler{ | ||
client, | ||
kClient, | ||
} | ||
|
||
componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{ | ||
|
@@ -110,6 +112,15 @@ var runCmd = &cobra.Command{ | |
log.WithError(err).Fatal("unable to bind controller watch event handler") | ||
} | ||
|
||
wc, err := NewWorkspaceCountController(mgr.GetClient()) | ||
if err != nil { | ||
log.WithError(err).Fatal("unable to create workspace count controller") | ||
} | ||
err = wc.SetupWithManager(mgr) | ||
if err != nil { | ||
log.WithError(err).Fatal("unable to bind workspace count controller") | ||
} | ||
|
||
metrics.Registry.MustRegister(NodeLabelerCounterVec) | ||
metrics.Registry.MustRegister(NodeLabelerTimeHistVec) | ||
|
||
|
@@ -123,10 +134,10 @@ var runCmd = &cobra.Command{ | |
log.WithError(err).Fatal("unable to set up ready check") | ||
} | ||
|
||
log.Info("starting node-labeber") | ||
log.Info("starting node-labeler") | ||
err = mgr.Start(ctrl.SetupSignalHandler()) | ||
if err != nil { | ||
log.WithError(err).Fatal("problem running node-labeber") | ||
log.WithError(err).Fatal("problem running node-labeler") | ||
} | ||
|
||
log.Info("Received SIGINT - shutting down") | ||
|
@@ -135,6 +146,8 @@ var runCmd = &cobra.Command{ | |
|
||
func init() { | ||
utilruntime.Must(clientgoscheme.AddToScheme(scheme)) | ||
utilruntime.Must(workspacev1.AddToScheme(scheme)) | ||
|
||
rootCmd.AddCommand(runCmd) | ||
} | ||
|
||
|
@@ -249,6 +262,150 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r | |
return reconcile.Result{}, nil | ||
} | ||
|
||
type WorkspaceCountController struct { | ||
client.Client | ||
} | ||
|
||
func NewWorkspaceCountController(client client.Client) (*WorkspaceCountController, error) { | ||
return &WorkspaceCountController{ | ||
Client: client, | ||
}, nil | ||
} | ||
|
||
func (wc *WorkspaceCountController) SetupWithManager(mgr ctrl.Manager) error { | ||
return ctrl.NewControllerManagedBy(mgr). | ||
Named("workspace-count"). | ||
For(&workspacev1.Workspace{}). | ||
WithEventFilter(workspaceFilter()). | ||
Complete(wc) | ||
} | ||
|
||
func workspaceFilter() predicate.Predicate { | ||
return predicate.Funcs{ | ||
CreateFunc: func(e event.CreateEvent) bool { | ||
ws := e.Object.(*workspacev1.Workspace) | ||
if ws.Status.Runtime == nil { | ||
log.WithField("workspace", ws.Name).Info("workspace not ready yet") | ||
return false | ||
} | ||
|
||
return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" | ||
}, | ||
UpdateFunc: func(e event.UpdateEvent) bool { | ||
kylos101 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
wsOld := e.ObjectOld.(*workspacev1.Workspace) | ||
ws := e.ObjectNew.(*workspacev1.Workspace) | ||
if wsOld.Status.Runtime == nil && ws.Status.Runtime != nil { | ||
kylos101 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return true | ||
} | ||
|
||
// if we've seen runtime info before, there's no need to reconcile again | ||
return false | ||
}, | ||
DeleteFunc: func(e event.DeleteEvent) bool { | ||
ws := e.Object.(*workspacev1.Workspace) | ||
return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" | ||
}, | ||
} | ||
} | ||
|
||
func (wc *WorkspaceCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling") | ||
|
||
var ws workspacev1.Workspace | ||
if err := wc.Get(ctx, req.NamespacedName, &ws); err != nil { | ||
if !errors.IsNotFound(err) { | ||
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace") | ||
return ctrl.Result{}, err | ||
} | ||
// If workspace not found, do a full reconciliation | ||
log.WithField("workspace", req.NamespacedName).Info("Workspace not found, reconciling all nodes") | ||
return wc.reconcileAllNodes(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blocking: I ask because I would expect this controller to evaluate all workspaces every 2m (the sync period for the controller's manager). It seems potentially unnecessary to me. Can you run a test, where the none of the events trigger (return false), and then check to see if the reconcile function triggers in 2m? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was going off of @iQQBot's comment here, where he mentioned listing nodes and workspaces often shouldn't be an issue. To answer your question: because we reconcile at least every 2m, this ... and it also helps with testing 😄 |
||
} | ||
|
||
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" { | ||
var workspaceList workspacev1.WorkspaceList | ||
if err := wc.List(ctx, &workspaceList); err != nil { | ||
log.WithError(err).Error("failed to list workspaces") | ||
return ctrl.Result{}, err | ||
} | ||
|
||
count := 0 | ||
nodeName := ws.Status.Runtime.NodeName | ||
for _, ws := range workspaceList.Items { | ||
if ws.Status.Runtime != nil && | ||
ws.Status.Runtime.NodeName == nodeName && | ||
ws.DeletionTimestamp.IsZero() { | ||
kylos101 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
count++ | ||
} | ||
} | ||
|
||
if err := wc.updateNodeAnnotation(ctx, nodeName, count); err != nil { | ||
return ctrl.Result{}, err | ||
filiptronicek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
log.WithField("node", nodeName).WithField("count", count).Info("updated node annotation") | ||
} | ||
|
||
return ctrl.Result{}, nil | ||
} | ||
|
||
func (wc *WorkspaceCountController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) { | ||
var workspaceList workspacev1.WorkspaceList | ||
if err := wc.List(ctx, &workspaceList); err != nil { | ||
log.WithError(err).Error("failed to list workspaces") | ||
return ctrl.Result{}, err | ||
} | ||
|
||
workspaceCounts := make(map[string]int) | ||
for _, ws := range workspaceList.Items { | ||
if ws.Status.Runtime != nil && | ||
ws.Status.Runtime.NodeName != "" && | ||
ws.DeletionTimestamp.IsZero() { | ||
kylos101 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
workspaceCounts[ws.Status.Runtime.NodeName]++ | ||
} | ||
} | ||
|
||
var nodes corev1.NodeList | ||
if err := wc.List(ctx, &nodes); err != nil { | ||
log.WithError(err).Error("failed to list nodes") | ||
return ctrl.Result{}, err | ||
} | ||
|
||
for _, node := range nodes.Items { | ||
count := workspaceCounts[node.Name] | ||
if err := wc.updateNodeAnnotation(ctx, node.Name, count); err != nil { | ||
log.WithError(err).WithField("node", node.Name).Error("failed to update node") | ||
continue | ||
} | ||
log.WithField("node", node.Name).WithField("count", count).Info("updated node annotation") | ||
} | ||
|
||
return ctrl.Result{}, nil | ||
} | ||
|
||
func (wc *WorkspaceCountController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error { | ||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error { | ||
var node corev1.Node | ||
err := wc.Get(ctx, types.NamespacedName{Name: nodeName}, &node) | ||
if err != nil { | ||
return fmt.Errorf("obtaining node %s: %w", nodeName, err) | ||
} | ||
|
||
if node.Annotations == nil { | ||
node.Annotations = make(map[string]string) | ||
} | ||
|
||
if count > 0 { | ||
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true" | ||
log.WithField("nodeName", nodeName).Info("disabling scale-down for node") | ||
} else { | ||
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled") | ||
log.WithField("nodeName", nodeName).Info("enabling scale-down for node") | ||
} | ||
filiptronicek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return wc.Update(ctx, &node) | ||
kylos101 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}) | ||
} | ||
|
||
func updateLabel(label string, add bool, nodeName string, client client.Client) error { | ||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error { | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
|
Uh oh!
There was an error while loading. Please reload this page.