Skip to content

Commit 1631a4a

Browse files
authored
[ws-manager-mk2] Loadgen fixes, concurrent reconciliation (#16613)
* [ws-manager-mk2] Loadgen fixes, concurrent reconciliation * [ws-manager-mk2] Update tests with content readiness * Simplify content ready check * Fix json tags * Revert poll interval
1 parent b411e14 commit 1631a4a

File tree

13 files changed

+87
-31
lines changed

13 files changed

+87
-31
lines changed

components/ws-daemon/pkg/controller/snapshot_controller.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"k8s.io/client-go/util/retry"
1212
ctrl "sigs.k8s.io/controller-runtime"
1313
"sigs.k8s.io/controller-runtime/pkg/client"
14+
"sigs.k8s.io/controller-runtime/pkg/controller"
1415
"sigs.k8s.io/controller-runtime/pkg/event"
1516
"sigs.k8s.io/controller-runtime/pkg/log"
1617
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -21,22 +22,27 @@ import (
2122
// SnapshotReconciler reconciles a Snapshot object
2223
type SnapshotReconciler struct {
2324
client.Client
24-
nodeName string
25-
operations *WorkspaceOperations
25+
maxConcurrentReconciles int
26+
nodeName string
27+
operations *WorkspaceOperations
2628
}
2729

28-
func NewSnapshotController(c client.Client, nodeName string, wso *WorkspaceOperations) *SnapshotReconciler {
30+
func NewSnapshotController(c client.Client, nodeName string, maxConcurrentReconciles int, wso *WorkspaceOperations) *SnapshotReconciler {
2931
return &SnapshotReconciler{
30-
Client: c,
31-
nodeName: nodeName,
32-
operations: wso,
32+
Client: c,
33+
maxConcurrentReconciles: maxConcurrentReconciles,
34+
nodeName: nodeName,
35+
operations: wso,
3336
}
3437
}
3538

3639
// SetupWithManager sets up the controller with the Manager.
3740
func (r *SnapshotReconciler) SetupWithManager(mgr ctrl.Manager) error {
3841
return ctrl.NewControllerManagedBy(mgr).
3942
Named("snapshot").
43+
WithOptions(controller.Options{
44+
MaxConcurrentReconciles: r.maxConcurrentReconciles,
45+
}).
4046
For(&workspacev1.Snapshot{}).
4147
WithEventFilter(snapshotEventFilter(r.nodeName)).
4248
Complete(r)

components/ws-daemon/pkg/controller/workspace_controller.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/client-go/util/retry"
2727
ctrl "sigs.k8s.io/controller-runtime"
2828
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/controller"
2930
"sigs.k8s.io/controller-runtime/pkg/event"
3031
"sigs.k8s.io/controller-runtime/pkg/log"
3132
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -49,27 +50,32 @@ type WorkspaceControllerOpts struct {
4950

5051
type WorkspaceController struct {
5152
client.Client
52-
NodeName string
53-
operations *WorkspaceOperations
54-
metrics *workspaceMetrics
53+
NodeName string
54+
maxConcurrentReconciles int
55+
operations *WorkspaceOperations
56+
metrics *workspaceMetrics
5557
}
5658

57-
func NewWorkspaceController(c client.Client, nodeName string, ops *WorkspaceOperations, reg prometheus.Registerer) (*WorkspaceController, error) {
59+
func NewWorkspaceController(c client.Client, nodeName string, maxConcurrentReconciles int, ops *WorkspaceOperations, reg prometheus.Registerer) (*WorkspaceController, error) {
5860
metrics := newWorkspaceMetrics()
5961
reg.Register(metrics)
6062

6163
return &WorkspaceController{
62-
Client: c,
63-
NodeName: nodeName,
64-
operations: ops,
65-
metrics: metrics,
64+
Client: c,
65+
NodeName: nodeName,
66+
maxConcurrentReconciles: maxConcurrentReconciles,
67+
operations: ops,
68+
metrics: metrics,
6669
}, nil
6770
}
6871

6972
// SetupWithManager sets up the controller with the Manager.
7073
func (wsc *WorkspaceController) SetupWithManager(mgr ctrl.Manager) error {
7174
return ctrl.NewControllerManagedBy(mgr).
7275
Named("workspace").
76+
WithOptions(controller.Options{
77+
MaxConcurrentReconciles: wsc.maxConcurrentReconciles,
78+
}).
7379
For(&workspacev1.Workspace{}).
7480
WithEventFilter(eventFilter(wsc.NodeName)).
7581
Complete(wsc)

components/ws-daemon/pkg/daemon/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ type Config struct {
3333
}
3434

3535
type WorkspaceControllerConfig struct {
36-
Enabled bool `json:"enabled"`
37-
WorkingAreaSuffix string `json:"workingAreaSuffix"`
36+
Enabled bool `json:"enabled"`
37+
WorkingAreaSuffix string `json:"workingAreaSuffix"`
38+
MaxConcurrentReconciles int `json:"maxConcurrentReconciles,omitempty"`
3839
}
3940

4041
type RuntimeConfig struct {

components/ws-daemon/pkg/daemon/daemon.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func NewDaemon(config Config) (*Daemon, error) {
207207
return nil, err
208208
}
209209

210-
wsctrl, err := controller.NewWorkspaceController(mgr.GetClient(), nodename, workspaceOps, wrappedReg)
210+
wsctrl, err := controller.NewWorkspaceController(mgr.GetClient(), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps, wrappedReg)
211211
if err != nil {
212212
return nil, err
213213
}
@@ -216,7 +216,7 @@ func NewDaemon(config Config) (*Daemon, error) {
216216
return nil, err
217217
}
218218

219-
ssctrl := controller.NewSnapshotController(mgr.GetClient(), nodename, workspaceOps)
219+
ssctrl := controller.NewSnapshotController(mgr.GetClient(), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps)
220220
err = ssctrl.SetupWithManager(mgr)
221221
if err != nil {
222222
return nil, err

components/ws-manager-api/go/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ type Configuration struct {
125125
WorkspaceClasses map[string]*WorkspaceClass `json:"workspaceClass"`
126126
// DebugWorkspacePod adds extra finalizer to workspace to prevent it from shutting down. Helps to debug.
127127
DebugWorkspacePod bool `json:"debugWorkspacePod,omitempty"`
128+
// WorkspaceMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on
129+
// the workspace controller.
130+
WorkspaceMaxConcurrentReconciles int `json:"workspaceMaxConcurrentReconciles,omitempty"`
128131
// TimeoutMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on
129132
// the timeout controller.
130133
TimeoutMaxConcurrentReconciles int `json:"timeoutMaxConcurrentReconciles,omitempty"`

components/ws-manager-mk2/controllers/status.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ func isDisposalFinished(ws *workspacev1.Workspace) bool {
191191
return wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupComplete)) ||
192192
wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupFailure)) ||
193193
wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionAborted)) ||
194-
wsk8s.ConditionWithStatusAndReason(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady), false, "InitializationFailure")
194+
// Nothing to dispose if content wasn't ready.
195+
!wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady))
195196
}
196197

197198
// extractFailure returns a pod failure reason and possibly a phase. If phase is nil then

components/ws-manager-mk2/controllers/workspace_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"k8s.io/apimachinery/pkg/types"
1717
ctrl "sigs.k8s.io/controller-runtime"
1818
"sigs.k8s.io/controller-runtime/pkg/client"
19+
"sigs.k8s.io/controller-runtime/pkg/controller"
1920
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2021
"sigs.k8s.io/controller-runtime/pkg/log"
2122

@@ -397,6 +398,9 @@ func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
397398

398399
return ctrl.NewControllerManagedBy(mgr).
399400
Named("workspace").
401+
WithOptions(controller.Options{
402+
MaxConcurrentReconciles: r.Config.WorkspaceMaxConcurrentReconciles,
403+
}).
400404
For(&workspacev1.Workspace{}).
401405
Owns(&corev1.Pod{}).
402406
Complete(r)

components/ws-manager-mk2/controllers/workspace_controller_test.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,28 @@ var _ = Describe("WorkspaceController", func() {
122122
})
123123
})
124124

125+
It("should not take a backup if content init did not happen", func() {
126+
ws := newWorkspace(uuid.NewString(), "default")
127+
m := collectMetricCounts(wsMetrics, ws)
128+
pod := createWorkspaceExpectPod(ws)
129+
130+
requestStop(ws)
131+
132+
// No content init, expect cleanup without backup.
133+
expectWorkspaceCleanup(ws, pod)
134+
135+
expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
136+
stops: 1,
137+
})
138+
})
139+
125140
It("should handle backup failure", func() {
126141
ws := newWorkspace(uuid.NewString(), "default")
127142
m := collectMetricCounts(wsMetrics, ws)
128143
pod := createWorkspaceExpectPod(ws)
129144

145+
markContentReady(ws)
146+
130147
// Stop the workspace.
131148
requestStop(ws)
132149

@@ -137,6 +154,7 @@ var _ = Describe("WorkspaceController", func() {
137154
expectWorkspaceCleanup(ws, pod)
138155

139156
expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
157+
restores: 1,
140158
backups: 1,
141159
backupFailures: 1,
142160
stops: 1,
@@ -148,6 +166,8 @@ var _ = Describe("WorkspaceController", func() {
148166
m := collectMetricCounts(wsMetrics, ws)
149167
pod := createWorkspaceExpectPod(ws)
150168

169+
markContentReady(ws)
170+
151171
// Update Pod with failed exit status.
152172
updateObjWithRetries(k8sClient, pod, true, func(pod *corev1.Pod) {
153173
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
@@ -168,6 +188,7 @@ var _ = Describe("WorkspaceController", func() {
168188
expectWorkspaceCleanup(ws, pod)
169189

170190
expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
191+
restores: 1,
171192
startFailures: 1,
172193
stops: 1,
173194
backups: 1,
@@ -179,6 +200,8 @@ var _ = Describe("WorkspaceController", func() {
179200
m := collectMetricCounts(wsMetrics, ws)
180201
pod := createWorkspaceExpectPod(ws)
181202

203+
markContentReady(ws)
204+
182205
By("adding Timeout condition")
183206
updateObjWithRetries(k8sClient, ws, true, func(ws *workspacev1.Workspace) {
184207
ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{
@@ -193,8 +216,9 @@ var _ = Describe("WorkspaceController", func() {
193216
expectWorkspaceCleanup(ws, pod)
194217

195218
expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
196-
stops: 1,
197-
backups: 1,
219+
restores: 1,
220+
stops: 1,
221+
backups: 1,
198222
})
199223
})
200224

@@ -203,6 +227,8 @@ var _ = Describe("WorkspaceController", func() {
203227
m := collectMetricCounts(wsMetrics, ws)
204228
pod := createWorkspaceExpectPod(ws)
205229

230+
markContentReady(ws)
231+
206232
// Update Pod with stop and abort conditions.
207233
updateObjWithRetries(k8sClient, ws, true, func(ws *workspacev1.Workspace) {
208234
ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{
@@ -221,7 +247,8 @@ var _ = Describe("WorkspaceController", func() {
221247
expectWorkspaceCleanup(ws, pod)
222248

223249
expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
224-
stops: 1,
250+
restores: 1,
251+
stops: 1,
225252
})
226253
})
227254

@@ -232,6 +259,8 @@ var _ = Describe("WorkspaceController", func() {
232259
m := collectMetricCounts(wsMetrics, ws)
233260
pod := createWorkspaceExpectPod(ws)
234261

262+
markContentReady(ws)
263+
235264
Expect(k8sClient.Delete(ctx, ws)).To(Succeed())
236265

237266
expectPhaseEventually(ws, workspacev1.WorkspacePhaseStopping)
@@ -243,8 +272,9 @@ var _ = Describe("WorkspaceController", func() {
243272
expectSecretCleanup(secret)
244273

245274
expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
246-
stops: 1,
247-
backups: 1,
275+
restores: 1,
276+
stops: 1,
277+
backups: 1,
248278
})
249279
})
250280

components/ws-manager-mk2/service/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func (wsm *WorkspaceManagerServer) StartWorkspace(ctx context.Context, req *wsma
257257
}
258258

259259
var wsr workspacev1.Workspace
260-
err = wait.PollWithContext(ctx, 100*time.Millisecond, 5*time.Second, func(c context.Context) (done bool, err error) {
260+
err = wait.PollWithContext(ctx, 100*time.Millisecond, 15*time.Second, func(c context.Context) (done bool, err error) {
261261
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &wsr)
262262
if err != nil {
263263
return false, nil

dev/loadgen/cmd/benchmark.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var benchmarkCommand = &cobra.Command{
5555
}
5656

5757
var load loadgen.LoadGenerator
58-
load = loadgen.NewFixedLoadGenerator(500*time.Millisecond, 300*time.Millisecond)
58+
load = loadgen.NewFixedLoadGenerator(800*time.Millisecond, 300*time.Millisecond)
5959
load = loadgen.NewWorkspaceCountLimitingGenerator(load, scenario.Workspaces)
6060

6161
template := &api.StartWorkspaceRequest{

dev/loadgen/pkg/loadgen/executor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,12 @@ func (w *WsmanExecutor) StopAll(ctx context.Context) error {
222222
if err != nil {
223223
log.Warnf("could not get workspaces: %v", err)
224224
} else {
225-
if len(resp.GetStatus()) == 0 {
225+
n := len(resp.GetStatus())
226+
if n == 0 {
226227
break
227228
}
229+
ex := resp.GetStatus()[0]
230+
log.Infof("%d workspaces remaining, e.g. %s", n, ex.Id)
228231
}
229232

230233
select {

install/installer/pkg/components/ws-daemon/configmap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) {
103103

104104
wscontroller.Enabled = ucfg.Workspace.UseWsmanagerMk2
105105
wscontroller.WorkingAreaSuffix = "-mk2"
106+
wscontroller.MaxConcurrentReconciles = 15
106107

107108
return nil
108109
})

install/installer/pkg/components/ws-manager-mk2/configmap.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,11 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) {
221221
Interrupted: util.Duration(5 * time.Minute),
222222
},
223223
//EventTraceLog: "", // todo(sje): make conditional based on config
224-
ReconnectionInterval: util.Duration(30 * time.Second),
225-
RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort),
226-
WorkspaceCACertSecret: customCASecret,
227-
TimeoutMaxConcurrentReconciles: 5,
224+
ReconnectionInterval: util.Duration(30 * time.Second),
225+
RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort),
226+
WorkspaceCACertSecret: customCASecret,
227+
WorkspaceMaxConcurrentReconciles: 15,
228+
TimeoutMaxConcurrentReconciles: 15,
228229
},
229230
Content: struct {
230231
Storage storageconfig.StorageConfig `json:"storage"`

0 commit comments

Comments
 (0)