@@ -18,22 +18,22 @@ import (
18
18
"github.com/spf13/cobra"
19
19
corev1 "k8s.io/api/core/v1"
20
20
"k8s.io/apimachinery/pkg/api/errors"
21
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
22
"k8s.io/apimachinery/pkg/runtime"
22
23
"k8s.io/apimachinery/pkg/types"
23
24
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
24
25
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
25
26
_ "k8s.io/client-go/plugin/pkg/client/auth"
26
27
"k8s.io/client-go/util/retry"
28
+ "k8s.io/utils/pointer"
27
29
ctrl "sigs.k8s.io/controller-runtime"
30
+ "sigs.k8s.io/controller-runtime/pkg/builder"
28
31
"sigs.k8s.io/controller-runtime/pkg/client"
29
32
"sigs.k8s.io/controller-runtime/pkg/controller"
30
- "sigs.k8s.io/controller-runtime/pkg/event"
31
- "sigs.k8s.io/controller-runtime/pkg/handler"
32
33
"sigs.k8s.io/controller-runtime/pkg/healthz"
33
34
"sigs.k8s.io/controller-runtime/pkg/metrics"
34
35
"sigs.k8s.io/controller-runtime/pkg/predicate"
35
36
"sigs.k8s.io/controller-runtime/pkg/reconcile"
36
- "sigs.k8s.io/controller-runtime/pkg/source"
37
37
38
38
"github.com/gitpod-io/gitpod/common-go/log"
39
39
)
@@ -46,6 +46,8 @@ const (
46
46
wsDaemon = "ws-daemon"
47
47
)
48
48
49
+ var defaultRequeueTime = time .Second * 10
50
+
49
51
// serveCmd represents the serve command
50
52
var runCmd = & cobra.Command {
51
53
Use : "run" ,
@@ -60,6 +62,10 @@ var runCmd = &cobra.Command{
60
62
LeaderElection : true ,
61
63
LeaderElectionID : "node-labeler.gitpod.io" ,
62
64
Namespace : namespace ,
65
+ // default sync period is 10h.
66
+ // in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
67
+ // that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
68
+ SyncPeriod : pointer .Duration (2 * time .Minute ),
63
69
})
64
70
if err != nil {
65
71
log .WithError (err ).Fatal ("unable to start node-labeber" )
@@ -74,35 +80,38 @@ var runCmd = &cobra.Command{
74
80
client ,
75
81
}
76
82
77
- c , err := controller .New ("pod-watcher" , mgr , controller.Options {
78
- Reconciler : r ,
79
- MaxConcurrentReconciles : 20 ,
83
+ rfPredicate , err := predicate .LabelSelectorPredicate (metav1.LabelSelector {
84
+ MatchLabels : map [string ]string {
85
+ "app" : "gitpod" ,
86
+ "component" : "registry-facade" ,
87
+ },
80
88
})
81
89
if err != nil {
82
- log .WithError (err ).Fatal ("unable to bind controller watch event handler " )
90
+ log .WithError (err ).Fatal ("unable to create predicate " )
83
91
}
84
92
85
- metrics .Registry .MustRegister (NodeLabelerCounterVec )
86
- metrics .Registry .MustRegister (NodeLabelerTimeHistVec )
87
-
88
- err = c .Watch (& source.Kind {Type : & corev1.Pod {}}, & handler.EnqueueRequestForObject {}, predicate.Funcs {
89
- CreateFunc : func (ce event.CreateEvent ) bool {
90
- return processPodEvent (ce .Object )
91
- },
92
- UpdateFunc : func (ue event.UpdateEvent ) bool {
93
- return processPodEvent (ue .ObjectNew )
94
- },
95
- DeleteFunc : func (deleteEvent event.DeleteEvent ) bool {
96
- return processPodEvent (deleteEvent .Object )
97
- },
98
- GenericFunc : func (genericEvent event.GenericEvent ) bool {
99
- return false
93
+ wsPredicate , err := predicate .LabelSelectorPredicate (metav1.LabelSelector {
94
+ MatchLabels : map [string ]string {
95
+ "app" : "gitpod" ,
96
+ "component" : "ws-daemon" ,
100
97
},
101
98
})
102
99
if err != nil {
103
- log .WithError (err ).Fatal ("unable to create controller" )
100
+ log .WithError (err ).Fatal ("unable to create predicate" )
101
+ }
102
+
103
+ err = ctrl .NewControllerManagedBy (mgr ).
104
+ Named ("pod-watcher" ).
105
+ For (& corev1.Pod {}, builder .WithPredicates (predicate .Or (rfPredicate , wsPredicate ))).
106
+ WithOptions (controller.Options {MaxConcurrentReconciles : 1 }).
107
+ Complete (r )
108
+ if err != nil {
109
+ log .WithError (err ).Fatal ("unable to bind controller watch event handler" )
104
110
}
105
111
112
+ metrics .Registry .MustRegister (NodeLabelerCounterVec )
113
+ metrics .Registry .MustRegister (NodeLabelerTimeHistVec )
114
+
106
115
err = mgr .AddHealthzCheck ("healthz" , healthz .Ping )
107
116
if err != nil {
108
117
log .WithError (err ).Fatal ("unable to set up health check" )
@@ -132,19 +141,13 @@ var (
132
141
scheme = runtime .NewScheme ()
133
142
)
134
143
135
- func processPodEvent (pod client.Object ) bool {
136
- if strings .HasPrefix (pod .GetName (), registryFacade ) || strings .HasPrefix (pod .GetName (), wsDaemon ) {
137
- return true
138
- }
139
-
140
- return false
141
- }
142
-
143
144
type PodReconciler struct {
144
145
client.Client
145
146
}
146
147
147
148
func (r * PodReconciler ) Reconcile (ctx context.Context , req reconcile.Request ) (reconcile.Result , error ) {
149
+ log .WithField ("pod" , req .Name ).Info ("reconciling" )
150
+
148
151
var pod corev1.Pod
149
152
err := r .Get (ctx , req .NamespacedName , & pod )
150
153
if err != nil {
@@ -157,16 +160,14 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
157
160
158
161
nodeName := pod .Spec .NodeName
159
162
if nodeName == "" {
160
- return reconcile.Result {RequeueAfter : time . Second * 10 }, err
163
+ return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
161
164
}
162
165
163
166
var (
164
167
ipAddress string
165
168
port string
166
169
component string
167
170
labelToUpdate string
168
-
169
- waitTimeout time.Duration = 5 * time .Second
170
171
)
171
172
172
173
switch {
@@ -181,7 +182,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
181
182
ipAddress = pod .Status .PodIP
182
183
port = strconv .Itoa (wsdaemonPort )
183
184
default :
184
- log . WithField ( "pod" , pod . Name ). Info ( "Invalid pod. Skipping..." )
185
+ // nothing to do
185
186
return reconcile.Result {}, nil
186
187
}
187
188
@@ -198,14 +199,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
198
199
}
199
200
200
201
log .WithError (err ).Error ("removing node label" )
201
- return reconcile.Result {RequeueAfter : time . Second * 10 }, err
202
+ return reconcile.Result {RequeueAfter : defaultRequeueTime }, err
202
203
}
203
204
204
205
return reconcile.Result {}, err
205
206
}
206
207
207
208
if ! IsPodReady (& pod ) {
208
- // not ready. Wait until the next update.
209
209
return reconcile.Result {}, nil
210
210
}
211
211
@@ -215,28 +215,28 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
215
215
return reconcile.Result {}, fmt .Errorf ("obtaining node %s: %w" , nodeName , err )
216
216
}
217
217
218
- if node .Labels [labelToUpdate ] == "true" {
219
- // Label already exists.
218
+ if labelValue , exists := node .Labels [labelToUpdate ]; exists && labelValue == "true" {
219
+ // nothing to do, the label already exists.
220
220
return reconcile.Result {}, nil
221
221
}
222
222
223
- err = waitForTCPPortToBeReachable (ipAddress , port , 30 * time . Second )
223
+ err = checkTCPPortIsReachable (ipAddress , port )
224
224
if err != nil {
225
- return reconcile.Result {}, fmt .Errorf ("waiting for TCP port: %v" , err )
225
+ log .WithField ("host" , ipAddress ).WithField ("port" , port ).WithField ("pod" , pod .Name ).WithError (err ).Error ("checking if TCP port is open" )
226
+ return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
226
227
}
227
228
228
229
if component == registryFacade {
229
230
err = checkRegistryFacade (ipAddress , port )
230
231
if err != nil {
231
232
log .WithError (err ).Error ("checking registry-facade" )
232
- return reconcile.Result {RequeueAfter : time . Second * 10 }, nil
233
+ return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
233
234
}
234
235
}
235
236
236
- time .Sleep (waitTimeout )
237
-
238
237
err = updateLabel (labelToUpdate , true , nodeName , r )
239
238
if err != nil {
239
+ log .WithError (err ).Error ("updating node label" )
240
240
return reconcile.Result {}, fmt .Errorf ("trying to add the label: %v" , err )
241
241
}
242
242
@@ -258,11 +258,6 @@ func updateLabel(label string, add bool, nodeName string, client client.Client)
258
258
return err
259
259
}
260
260
261
- _ , hasLabel := node .Labels [label ]
262
- if add == hasLabel {
263
- return nil
264
- }
265
-
266
261
if add {
267
262
node .Labels [label ] = "true"
268
263
log .WithField ("label" , label ).WithField ("node" , nodeName ).Info ("adding label to node" )
@@ -280,31 +275,14 @@ func updateLabel(label string, add bool, nodeName string, client client.Client)
280
275
})
281
276
}
282
277
283
- func waitForTCPPortToBeReachable (host string , port string , timeout time.Duration ) error {
284
- ctx , cancel := context .WithTimeout (context .Background (), timeout )
285
- defer cancel ()
286
-
287
- ticker := time .NewTicker (1 * time .Second )
288
- defer ticker .Stop ()
289
-
290
- for {
291
- select {
292
- case <- ctx .Done ():
293
- return fmt .Errorf ("port %v on host %v never reachable" , port , host )
294
- case <- ticker .C :
295
- conn , err := net .DialTimeout ("tcp" , net .JoinHostPort (host , port ), 500 * time .Millisecond )
296
- if err != nil {
297
- continue
298
- }
299
-
300
- if conn != nil {
301
- conn .Close ()
302
- return nil
303
- }
304
-
305
- continue
306
- }
278
+ func checkTCPPortIsReachable (host string , port string ) error {
279
+ conn , err := net .DialTimeout ("tcp" , net .JoinHostPort (host , port ), 1 * time .Second )
280
+ if err != nil {
281
+ return err
307
282
}
283
+ defer conn .Close ()
284
+
285
+ return nil
308
286
}
309
287
310
288
func checkRegistryFacade (host , port string ) error {
0 commit comments