Skip to content

Replace watch with builder.ControllerManagedBy #17028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 41 additions & 73 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import (
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/gitpod-io/gitpod/common-go/log"
)
Expand All @@ -46,6 +46,8 @@ const (
wsDaemon = "ws-daemon"
)

var defaultRequeueTime = time.Second * 10

// serveCmd represents the serve command
var runCmd = &cobra.Command{
Use: "run",
Expand All @@ -60,6 +62,10 @@ var runCmd = &cobra.Command{
LeaderElection: true,
LeaderElectionID: "node-labeler.gitpod.io",
Namespace: namespace,
// default sync period is 10h.
// in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
// that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
SyncPeriod: pointer.Duration(2 * time.Minute),
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
Expand All @@ -74,35 +80,29 @@ var runCmd = &cobra.Command{
client,
}

c, err := controller.New("pod-watcher", mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: 20,
componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{{
Key: "component",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"ws-daemon", "registry-facade"},
}},
})
if err != nil {
log.WithError(err).Fatal("unable to create predicate")
}

err = ctrl.NewControllerManagedBy(mgr).
Named("pod-watcher").
For(&corev1.Pod{}, builder.WithPredicates(predicate.Or(componentPredicate))).
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set RequeueAfter with MaxConcurrentReconciles > 1, the retry does not respect the wait time.
Now that we don't sleep in the reconcile loop, setting MaxConcurrentReconciles is not a problem

Complete(r)
if err != nil {
log.WithError(err).Fatal("unable to bind controller watch event handler")
}

metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)

err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool {
return processPodEvent(ce.Object)
},
UpdateFunc: func(ue event.UpdateEvent) bool {
return processPodEvent(ue.ObjectNew)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return processPodEvent(deleteEvent.Object)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
})
if err != nil {
log.WithError(err).Fatal("unable to create controller")
}

err = mgr.AddHealthzCheck("healthz", healthz.Ping)
if err != nil {
log.WithError(err).Fatal("unable to set up health check")
Expand Down Expand Up @@ -132,14 +132,6 @@ var (
scheme = runtime.NewScheme()
)

func processPodEvent(pod client.Object) bool {
if strings.HasPrefix(pod.GetName(), registryFacade) || strings.HasPrefix(pod.GetName(), wsDaemon) {
return true
}

return false
}

type PodReconciler struct {
client.Client
}
Expand All @@ -157,16 +149,14 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r

nodeName := pod.Spec.NodeName
if nodeName == "" {
return reconcile.Result{RequeueAfter: time.Second * 10}, err
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
}

var (
ipAddress string
port string
component string
labelToUpdate string

waitTimeout time.Duration = 5 * time.Second
)

switch {
Expand All @@ -181,7 +171,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
ipAddress = pod.Status.PodIP
port = strconv.Itoa(wsdaemonPort)
default:
log.WithField("pod", pod.Name).Info("Invalid pod. Skipping...")
// nothing to do
return reconcile.Result{}, nil
}

Expand All @@ -198,7 +188,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
}

log.WithError(err).Error("removing node label")
return reconcile.Result{RequeueAfter: time.Second * 10}, err
return reconcile.Result{RequeueAfter: defaultRequeueTime}, err
}

return reconcile.Result{}, err
Expand All @@ -215,28 +205,28 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, fmt.Errorf("obtaining node %s: %w", nodeName, err)
}

if node.Labels[labelToUpdate] == "true" {
// Label already exists.
if labelValue, exists := node.Labels[labelToUpdate]; exists && labelValue == "true" {
// nothing to do, the label already exists.
return reconcile.Result{}, nil
}

err = waitForTCPPortToBeReachable(ipAddress, port, 30*time.Second)
err = checkTCPPortIsReachable(ipAddress, port)
if err != nil {
return reconcile.Result{}, fmt.Errorf("waiting for TCP port: %v", err)
log.WithField("host", ipAddress).WithField("port", port).WithField("pod", pod.Name).WithError(err).Error("checking if TCP port is open")
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
}

if component == registryFacade {
err = checkRegistryFacade(ipAddress, port)
if err != nil {
log.WithError(err).Error("checking registry-facade")
return reconcile.Result{RequeueAfter: time.Second * 10}, nil
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
}
}

time.Sleep(waitTimeout)

err = updateLabel(labelToUpdate, true, nodeName, r)
if err != nil {
log.WithError(err).Error("updating node label")
return reconcile.Result{}, fmt.Errorf("trying to add the label: %v", err)
}

Expand All @@ -258,11 +248,6 @@ func updateLabel(label string, add bool, nodeName string, client client.Client)
return err
}

_, hasLabel := node.Labels[label]
Copy link
Member Author

@aledbf aledbf Mar 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not checking the value of the label and the check was done before calling this function

if add == hasLabel {
return nil
}

if add {
node.Labels[label] = "true"
log.WithField("label", label).WithField("node", nodeName).Info("adding label to node")
Expand All @@ -280,31 +265,14 @@ func updateLabel(label string, add bool, nodeName string, client client.Client)
})
}

func waitForTCPPortToBeReachable(host string, port string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("port %v on host %v never reachable", port, host)
case <-ticker.C:
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 500*time.Millisecond)
if err != nil {
continue
}

if conn != nil {
conn.Close()
return nil
}

continue
}
func checkTCPPortIsReachable(host string, port string) error {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 1*time.Second)
if err != nil {
return err
}
defer conn.Close()

return nil
}

func checkRegistryFacade(host, port string) error {
Expand Down