Skip to content

Commit 3371869

Browse files
committed
Reject GRPC connections on stand-by replicas
1 parent 5c7fa0b commit 3371869

File tree

1 file changed

+36
-13
lines changed

1 file changed

+36
-13
lines changed

components/ws-manager-mk2/main.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ package main
66

77
import (
88
"bytes"
9+
"context"
910
"encoding/json"
1011
"flag"
1112
"fmt"
1213
"net"
1314
"os"
15+
"sync/atomic"
1416
"time"
1517

1618
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
@@ -59,6 +61,8 @@ var (
5961

6062
scheme = runtime.NewScheme()
6163
setupLog = ctrl.Log.WithName("setup")
64+
65+
LeaderInstance atomic.Bool
6266
)
6367

6468
func init() {
@@ -138,6 +142,11 @@ func main() {
138142

139143
mgrCtx := ctrl.SetupSignalHandler()
140144

145+
go func() {
146+
<-mgr.Elected()
147+
LeaderInstance.Store(true)
148+
}()
149+
141150
maintenanceReconciler, err := controllers.NewMaintenanceReconciler(mgr.GetClient())
142151
if err != nil {
143152
setupLog.Error(err, "unable to create maintenance controller", "controller", "Maintenance")
@@ -170,18 +179,13 @@ func main() {
170179
os.Exit(1)
171180
}
172181

173-
// Wait for leader election to start the GRPC server
174-
go func() {
175-
<-mgr.Elected()
176-
177-
wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), maintenanceReconciler)
178-
if err != nil {
179-
setupLog.Error(err, "unable to start manager service")
180-
os.Exit(1)
181-
}
182+
wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), maintenanceReconciler)
183+
if err != nil {
184+
setupLog.Error(err, "unable to start manager service")
185+
os.Exit(1)
186+
}
182187

183-
workspaceReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
184-
}()
188+
workspaceReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
185189

186190
if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
187191
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Workspace")
@@ -234,8 +238,27 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, maint
234238
metrics.Registry.MustRegister(grpcMetrics)
235239

236240
grpcOpts := common_grpc.ServerOptionsWithInterceptors(
237-
[]grpc.StreamServerInterceptor{grpcMetrics.StreamServerInterceptor()},
238-
[]grpc.UnaryServerInterceptor{grpcMetrics.UnaryServerInterceptor(), ratelimits.UnaryInterceptor()},
241+
[]grpc.StreamServerInterceptor{
242+
grpcMetrics.StreamServerInterceptor(),
243+
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
244+
if LeaderInstance.Load() {
245+
return handler(srv, ss)
246+
}
247+
248+
return fmt.Errorf("Rejecting connection due leader election")
249+
},
250+
},
251+
[]grpc.UnaryServerInterceptor{
252+
grpcMetrics.UnaryServerInterceptor(),
253+
ratelimits.UnaryInterceptor(),
254+
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
255+
if LeaderInstance.Load() {
256+
return handler(ctx, req)
257+
}
258+
259+
return nil, fmt.Errorf("Rejecting connection due leader election")
260+
},
261+
},
239262
)
240263
if cfg.RPCServer.TLS.CA != "" && cfg.RPCServer.TLS.Certificate != "" && cfg.RPCServer.TLS.PrivateKey != "" {
241264
tlsConfig, err := common_grpc.ClientAuthTLSConfig(

0 commit comments

Comments
 (0)