Skip to content

Commit 1ba1006

Browse files
committed
Start the grpc server after leader election
1 parent 7f968fa commit 1ba1006

File tree

1 file changed

+31
-2
lines changed

1 file changed

+31
-2
lines changed

components/ws-manager-mk2/main.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ package main
66

77
import (
88
"bytes"
9+
"context"
910
"encoding/json"
1011
"flag"
1112
"fmt"
1213
"net"
1314
"os"
15+
"sync/atomic"
1416
"time"
1517

1618
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
@@ -59,6 +61,8 @@ var (
5961

6062
scheme = runtime.NewScheme()
6163
setupLog = ctrl.Log.WithName("setup")
64+
65+
LeaderInstance atomic.Bool
6266
)
6367

6468
func init() {
@@ -138,6 +142,11 @@ func main() {
138142

139143
mgrCtx := ctrl.SetupSignalHandler()
140144

145+
go func() {
146+
<-mgr.Elected()
147+
LeaderInstance.Store(true)
148+
}()
149+
141150
maintenanceReconciler, err := controllers.NewMaintenanceReconciler(mgr.GetClient())
142151
if err != nil {
143152
setupLog.Error(err, "unable to create maintenance controller", "controller", "Maintenance")
@@ -177,6 +186,7 @@ func main() {
177186
}
178187

179188
workspaceReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
189+
180190
if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
181191
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Workspace")
182192
os.Exit(1)
@@ -228,8 +238,27 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, maint
228238
metrics.Registry.MustRegister(grpcMetrics)
229239

230240
grpcOpts := common_grpc.ServerOptionsWithInterceptors(
231-
[]grpc.StreamServerInterceptor{grpcMetrics.StreamServerInterceptor()},
232-
[]grpc.UnaryServerInterceptor{grpcMetrics.UnaryServerInterceptor(), ratelimits.UnaryInterceptor()},
241+
[]grpc.StreamServerInterceptor{
242+
grpcMetrics.StreamServerInterceptor(),
243+
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
244+
if LeaderInstance.Load() {
245+
return handler(srv, ss)
246+
}
247+
248+
return fmt.Errorf("Rejecting connection due leader election")
249+
},
250+
},
251+
[]grpc.UnaryServerInterceptor{
252+
grpcMetrics.UnaryServerInterceptor(),
253+
ratelimits.UnaryInterceptor(),
254+
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
255+
if LeaderInstance.Load() {
256+
return handler(ctx, req)
257+
}
258+
259+
return nil, fmt.Errorf("Rejecting connection due leader election")
260+
},
261+
},
233262
)
234263
if cfg.RPCServer.TLS.CA != "" && cfg.RPCServer.TLS.Certificate != "" && cfg.RPCServer.TLS.PrivateKey != "" {
235264
tlsConfig, err := common_grpc.ClientAuthTLSConfig(

0 commit comments

Comments
 (0)