Skip to content

Commit c73dc1d

Browse files
committed
Bount the source of subscribers to an informer
1 parent 1ba1006 commit c73dc1d

File tree

3 files changed

+108
-54
lines changed

3 files changed

+108
-54
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package controllers
6+
7+
import (
8+
"context"
9+
"os"
10+
11+
"k8s.io/apimachinery/pkg/api/errors"
12+
"k8s.io/apimachinery/pkg/runtime"
13+
"k8s.io/client-go/tools/record"
14+
ctrl "sigs.k8s.io/controller-runtime"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
"sigs.k8s.io/controller-runtime/pkg/controller"
17+
"sigs.k8s.io/controller-runtime/pkg/handler"
18+
"sigs.k8s.io/controller-runtime/pkg/log"
19+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
20+
"sigs.k8s.io/controller-runtime/pkg/source"
21+
22+
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
23+
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
24+
)
25+
26+
func NewSubscriberReconciler(c client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, cfg *config.Configuration) (*SubscriberReconciler, error) {
27+
reconciler := &SubscriberReconciler{
28+
Client: c,
29+
Scheme: scheme,
30+
Config: cfg,
31+
Recorder: recorder,
32+
}
33+
34+
return reconciler, nil
35+
}
36+
37+
type SubscriberReconciler struct {
38+
client.Client
39+
Scheme *runtime.Scheme
40+
41+
Config *config.Configuration
42+
Recorder record.EventRecorder
43+
OnReconcile func(ctx context.Context, ws *workspacev1.Workspace)
44+
}
45+
46+
// Reconcile is part of the main kubernetes reconciliation loop which aims to
47+
// move the current state of the cluster closer to the desired state.
48+
// Modify the Reconcile function to compare the state specified by
49+
// the Workspace object against the actual cluster state, and then
50+
// perform operations to make the cluster state reflect the state specified by
51+
// the user.
52+
//
53+
// For more details, check Reconcile and its Result here:
54+
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
55+
func (r *SubscriberReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
56+
log := log.FromContext(ctx)
57+
58+
var workspace workspacev1.Workspace
59+
if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil {
60+
if !errors.IsNotFound(err) {
61+
log.Error(err, "unable to fetch workspace")
62+
}
63+
64+
return ctrl.Result{}, client.IgnoreNotFound(err)
65+
}
66+
67+
if r.OnReconcile != nil {
68+
// Publish to subscribers in a goroutine, to prevent blocking the main reconcile loop.
69+
ws := workspace.DeepCopy()
70+
go func() {
71+
r.OnReconcile(ctx, ws)
72+
}()
73+
}
74+
75+
return reconcile.Result{}, nil
76+
}
77+
78+
// SetupWithManager sets up the controller with the Manager.
79+
func (r *SubscriberReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
80+
c, err := controller.NewUnmanaged("maintenance-controller", mgr, controller.Options{Reconciler: r})
81+
if err != nil {
82+
return err
83+
}
84+
85+
go func() {
86+
err = c.Start(ctx)
87+
if err != nil {
88+
log.FromContext(ctx).Error(err, "cannot start maintenance reconciler")
89+
os.Exit(1)
90+
}
91+
}()
92+
93+
return c.Watch(source.Kind(mgr.GetCache(), &workspacev1.Workspace{}), &handler.EnqueueRequestForObject{})
94+
}

components/ws-manager-mk2/controllers/workspace_controller.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ type WorkspaceReconciler struct {
7171
metrics *controllerMetrics
7272
maintenance maintenance.Maintenance
7373
Recorder record.EventRecorder
74-
OnReconcile func(ctx context.Context, ws *workspacev1.Workspace)
7574
}
7675

7776
//+kubebuilder:rbac:groups=workspace.gitpod.io,resources=workspaces,verbs=get;list;watch;create;update;patch;delete
@@ -107,14 +106,6 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
107106
workspace.Status.Conditions = []metav1.Condition{}
108107
}
109108

110-
if r.OnReconcile != nil {
111-
// Publish to subscribers in a goroutine, to prevent blocking the main reconcile loop.
112-
ws := workspace.DeepCopy()
113-
go func() {
114-
r.OnReconcile(ctx, ws)
115-
}()
116-
}
117-
118109
log.Info("reconciling workspace", "workspace", req.NamespacedName, "phase", workspace.Status.Phase)
119110

120111
var workspacePods corev1.PodList

components/ws-manager-mk2/main.go

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,11 @@ package main
66

77
import (
88
"bytes"
9-
"context"
109
"encoding/json"
1110
"flag"
1211
"fmt"
1312
"net"
1413
"os"
15-
"sync/atomic"
16-
"time"
1714

1815
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
1916
// to ensure that exec-entrypoint and run can make use of them.
@@ -61,8 +58,6 @@ var (
6158

6259
scheme = runtime.NewScheme()
6360
setupLog = ctrl.Log.WithName("setup")
64-
65-
LeaderInstance atomic.Bool
6661
)
6762

6863
func init() {
@@ -142,11 +137,6 @@ func main() {
142137

143138
mgrCtx := ctrl.SetupSignalHandler()
144139

145-
go func() {
146-
<-mgr.Elected()
147-
LeaderInstance.Store(true)
148-
}()
149-
150140
maintenanceReconciler, err := controllers.NewMaintenanceReconciler(mgr.GetClient())
151141
if err != nil {
152142
setupLog.Error(err, "unable to create maintenance controller", "controller", "Maintenance")
@@ -160,19 +150,6 @@ func main() {
160150
os.Exit(1)
161151
}
162152

163-
go func() {
164-
for {
165-
select {
166-
case <-mgrCtx.Done():
167-
return
168-
case <-mgr.Elected():
169-
now := time.Now()
170-
setupLog.Info("updating activity started time", "now", now)
171-
return
172-
}
173-
}
174-
}()
175-
176153
timeoutReconciler, err := controllers.NewTimeoutReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("workspace"), cfg.Manager, maintenanceReconciler)
177154
if err != nil {
178155
setupLog.Error(err, "unable to create timeout controller", "controller", "Timeout")
@@ -185,7 +162,18 @@ func main() {
185162
os.Exit(1)
186163
}
187164

188-
workspaceReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
165+
subscriberReconciler, err := controllers.NewSubscriberReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("subscribers"), &cfg.Manager)
166+
if err != nil {
167+
setupLog.Error(err, "unable to create timeout controller", "controller", "Timeout")
168+
os.Exit(1)
169+
}
170+
171+
subscriberReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
172+
173+
if err = subscriberReconciler.SetupWithManager(mgrCtx, mgr); err != nil {
174+
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Subscribers")
175+
os.Exit(1)
176+
}
189177

190178
if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
191179
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Workspace")
@@ -238,27 +226,8 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, maint
238226
metrics.Registry.MustRegister(grpcMetrics)
239227

240228
grpcOpts := common_grpc.ServerOptionsWithInterceptors(
241-
[]grpc.StreamServerInterceptor{
242-
grpcMetrics.StreamServerInterceptor(),
243-
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
244-
if LeaderInstance.Load() {
245-
return handler(srv, ss)
246-
}
247-
248-
return fmt.Errorf("Rejecting connection due leader election")
249-
},
250-
},
251-
[]grpc.UnaryServerInterceptor{
252-
grpcMetrics.UnaryServerInterceptor(),
253-
ratelimits.UnaryInterceptor(),
254-
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
255-
if LeaderInstance.Load() {
256-
return handler(ctx, req)
257-
}
258-
259-
return nil, fmt.Errorf("Rejecting connection due leader election")
260-
},
261-
},
229+
[]grpc.StreamServerInterceptor{grpcMetrics.StreamServerInterceptor()},
230+
[]grpc.UnaryServerInterceptor{grpcMetrics.UnaryServerInterceptor(), ratelimits.UnaryInterceptor()},
262231
)
263232
if cfg.RPCServer.TLS.CA != "" && cfg.RPCServer.TLS.Certificate != "" && cfg.RPCServer.TLS.PrivateKey != "" {
264233
tlsConfig, err := common_grpc.ClientAuthTLSConfig(

0 commit comments

Comments
 (0)