@@ -6,14 +6,11 @@ package main
6
6
7
7
import (
8
8
"bytes"
9
- "context"
10
9
"encoding/json"
11
10
"flag"
12
11
"fmt"
13
12
"net"
14
13
"os"
15
- "sync/atomic"
16
- "time"
17
14
18
15
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
19
16
// to ensure that exec-entrypoint and run can make use of them.
61
58
62
59
scheme = runtime .NewScheme ()
63
60
setupLog = ctrl .Log .WithName ("setup" )
64
-
65
- LeaderInstance atomic.Bool
66
61
)
67
62
68
63
func init () {
@@ -142,11 +137,6 @@ func main() {
142
137
143
138
mgrCtx := ctrl .SetupSignalHandler ()
144
139
145
- go func () {
146
- <- mgr .Elected ()
147
- LeaderInstance .Store (true )
148
- }()
149
-
150
140
maintenanceReconciler , err := controllers .NewMaintenanceReconciler (mgr .GetClient ())
151
141
if err != nil {
152
142
setupLog .Error (err , "unable to create maintenance controller" , "controller" , "Maintenance" )
@@ -160,19 +150,6 @@ func main() {
160
150
os .Exit (1 )
161
151
}
162
152
163
- go func () {
164
- for {
165
- select {
166
- case <- mgrCtx .Done ():
167
- return
168
- case <- mgr .Elected ():
169
- now := time .Now ()
170
- setupLog .Info ("updating activity started time" , "now" , now )
171
- return
172
- }
173
- }
174
- }()
175
-
176
153
timeoutReconciler , err := controllers .NewTimeoutReconciler (mgr .GetClient (), mgr .GetEventRecorderFor ("workspace" ), cfg .Manager , maintenanceReconciler )
177
154
if err != nil {
178
155
setupLog .Error (err , "unable to create timeout controller" , "controller" , "Timeout" )
@@ -185,7 +162,18 @@ func main() {
185
162
os .Exit (1 )
186
163
}
187
164
188
- workspaceReconciler .OnReconcile = wsmanService .OnWorkspaceReconcile
165
+ subscriberReconciler , err := controllers .NewSubscriberReconciler (mgr .GetClient (), mgr .GetScheme (), mgr .GetEventRecorderFor ("subscribers" ), & cfg .Manager )
166
+ if err != nil {
167
+ setupLog .Error (err , "unable to create timeout controller" , "controller" , "Timeout" )
168
+ os .Exit (1 )
169
+ }
170
+
171
+ subscriberReconciler .OnReconcile = wsmanService .OnWorkspaceReconcile
172
+
173
+ if err = subscriberReconciler .SetupWithManager (mgrCtx , mgr ); err != nil {
174
+ setupLog .Error (err , "unable to setup workspace controller with manager" , "controller" , "Subscribers" )
175
+ os .Exit (1 )
176
+ }
189
177
190
178
if err = workspaceReconciler .SetupWithManager (mgr ); err != nil {
191
179
setupLog .Error (err , "unable to setup workspace controller with manager" , "controller" , "Workspace" )
@@ -238,27 +226,8 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, maint
238
226
metrics .Registry .MustRegister (grpcMetrics )
239
227
240
228
grpcOpts := common_grpc .ServerOptionsWithInterceptors (
241
- []grpc.StreamServerInterceptor {
242
- grpcMetrics .StreamServerInterceptor (),
243
- func (srv interface {}, ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
244
- if LeaderInstance .Load () {
245
- return handler (srv , ss )
246
- }
247
-
248
- return fmt .Errorf ("Rejecting connection due leader election" )
249
- },
250
- },
251
- []grpc.UnaryServerInterceptor {
252
- grpcMetrics .UnaryServerInterceptor (),
253
- ratelimits .UnaryInterceptor (),
254
- func (ctx context.Context , req interface {}, info * grpc.UnaryServerInfo , handler grpc.UnaryHandler ) (resp interface {}, err error ) {
255
- if LeaderInstance .Load () {
256
- return handler (ctx , req )
257
- }
258
-
259
- return nil , fmt .Errorf ("Rejecting connection due leader election" )
260
- },
261
- },
229
+ []grpc.StreamServerInterceptor {grpcMetrics .StreamServerInterceptor ()},
230
+ []grpc.UnaryServerInterceptor {grpcMetrics .UnaryServerInterceptor (), ratelimits .UnaryInterceptor ()},
262
231
)
263
232
if cfg .RPCServer .TLS .CA != "" && cfg .RPCServer .TLS .Certificate != "" && cfg .RPCServer .TLS .PrivateKey != "" {
264
233
tlsConfig , err := common_grpc .ClientAuthTLSConfig (
0 commit comments