Skip to content

[ws-manager-mk2] Loadgen fixes, concurrent reconciliation #16613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions components/ws-daemon/pkg/controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -21,22 +22,27 @@ import (
// SnapshotReconciler reconciles a Snapshot object
type SnapshotReconciler struct {
client.Client
nodeName string
operations *WorkspaceOperations
maxConcurrentReconciles int
nodeName string
operations *WorkspaceOperations
}

func NewSnapshotController(c client.Client, nodeName string, wso *WorkspaceOperations) *SnapshotReconciler {
func NewSnapshotController(c client.Client, nodeName string, maxConcurrentReconciles int, wso *WorkspaceOperations) *SnapshotReconciler {
return &SnapshotReconciler{
Client: c,
nodeName: nodeName,
operations: wso,
Client: c,
maxConcurrentReconciles: maxConcurrentReconciles,
nodeName: nodeName,
operations: wso,
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *SnapshotReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("snapshot").
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
}).
For(&workspacev1.Snapshot{}).
WithEventFilter(snapshotEventFilter(r.nodeName)).
Complete(r)
Expand Down
22 changes: 14 additions & 8 deletions components/ws-daemon/pkg/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -49,27 +50,32 @@ type WorkspaceControllerOpts struct {

type WorkspaceController struct {
client.Client
NodeName string
operations *WorkspaceOperations
metrics *workspaceMetrics
NodeName string
maxConcurrentReconciles int
operations *WorkspaceOperations
metrics *workspaceMetrics
}

func NewWorkspaceController(c client.Client, nodeName string, ops *WorkspaceOperations, reg prometheus.Registerer) (*WorkspaceController, error) {
func NewWorkspaceController(c client.Client, nodeName string, maxConcurrentReconciles int, ops *WorkspaceOperations, reg prometheus.Registerer) (*WorkspaceController, error) {
metrics := newWorkspaceMetrics()
reg.Register(metrics)

return &WorkspaceController{
Client: c,
NodeName: nodeName,
operations: ops,
metrics: metrics,
Client: c,
NodeName: nodeName,
maxConcurrentReconciles: maxConcurrentReconciles,
operations: ops,
metrics: metrics,
}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (wsc *WorkspaceController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("workspace").
WithOptions(controller.Options{
MaxConcurrentReconciles: wsc.maxConcurrentReconciles,
}).
For(&workspacev1.Workspace{}).
WithEventFilter(eventFilter(wsc.NodeName)).
Complete(wsc)
Expand Down
5 changes: 3 additions & 2 deletions components/ws-daemon/pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type Config struct {
}

type WorkspaceControllerConfig struct {
Enabled bool `json:"enabled"`
WorkingAreaSuffix string `json:"workingAreaSuffix"`
Enabled bool `json:"enabled"`
WorkingAreaSuffix string `json:"workingAreaSuffix"`
MaxConcurrentReconciles int `json:"maxConcurrentReconciles,omitempty"`
}

type RuntimeConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions components/ws-daemon/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func NewDaemon(config Config) (*Daemon, error) {
return nil, err
}

wsctrl, err := controller.NewWorkspaceController(mgr.GetClient(), nodename, workspaceOps, wrappedReg)
wsctrl, err := controller.NewWorkspaceController(mgr.GetClient(), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps, wrappedReg)
if err != nil {
return nil, err
}
Expand All @@ -216,7 +216,7 @@ func NewDaemon(config Config) (*Daemon, error) {
return nil, err
}

ssctrl := controller.NewSnapshotController(mgr.GetClient(), nodename, workspaceOps)
ssctrl := controller.NewSnapshotController(mgr.GetClient(), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps)
err = ssctrl.SetupWithManager(mgr)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions components/ws-manager-api/go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type Configuration struct {
WorkspaceClasses map[string]*WorkspaceClass `json:"workspaceClass"`
// DebugWorkspacePod adds extra finalizer to workspace to prevent it from shutting down. Helps to debug.
DebugWorkspacePod bool `json:"debugWorkspacePod,omitempty"`
// WorkspaceMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on
// the workspace controller.
WorkspaceMaxConcurrentReconciles int `json:"workspaceMaxConcurrentReconciles,omitempty"`
// TimeoutMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on
// the timeout controller.
TimeoutMaxConcurrentReconciles int `json:"timeoutMaxConcurrentReconciles,omitempty"`
Expand Down
3 changes: 2 additions & 1 deletion components/ws-manager-mk2/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func isDisposalFinished(ws *workspacev1.Workspace) bool {
return wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupComplete)) ||
wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupFailure)) ||
wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionAborted)) ||
wsk8s.ConditionWithStatusAndReason(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady), false, "InitializationFailure")
// Nothing to dispose if content wasn't ready.
!wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady))
Comment on lines +194 to +195
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also finish disposal if ContentReady condition isn't present. Fixes workspaces stuck in Stopping when the condition isn't added due to e.g. workspace startup failure

}

// extractFailure returns a pod failure reason and possibly a phase. If phase is nil then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -360,6 +361,9 @@ func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
Named("workspace").
WithOptions(controller.Options{
MaxConcurrentReconciles: r.Config.WorkspaceMaxConcurrentReconciles,
}).
For(&workspacev1.Workspace{}).
Owns(&corev1.Pod{}).
Complete(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,28 @@ var _ = Describe("WorkspaceController", func() {
})
})

It("should not take a backup if content init did not happen", func() {
ws := newWorkspace(uuid.NewString(), "default")
m := collectMetricCounts(wsMetrics, ws)
pod := createWorkspaceExpectPod(ws)

requestStop(ws)

// No content init, expect cleanup without backup.
expectWorkspaceCleanup(ws, pod)

expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
stops: 1,
})
})

It("should handle backup failure", func() {
ws := newWorkspace(uuid.NewString(), "default")
m := collectMetricCounts(wsMetrics, ws)
pod := createWorkspaceExpectPod(ws)

markContentReady(ws)

// Stop the workspace.
requestStop(ws)

Expand All @@ -134,6 +151,7 @@ var _ = Describe("WorkspaceController", func() {
expectWorkspaceCleanup(ws, pod)

expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
restores: 1,
backups: 1,
backupFailures: 1,
stops: 1,
Expand All @@ -145,6 +163,8 @@ var _ = Describe("WorkspaceController", func() {
m := collectMetricCounts(wsMetrics, ws)
pod := createWorkspaceExpectPod(ws)

markContentReady(ws)

// Update Pod with failed exit status.
updateObjWithRetries(k8sClient, pod, true, func(pod *corev1.Pod) {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
Expand All @@ -165,6 +185,7 @@ var _ = Describe("WorkspaceController", func() {
expectWorkspaceCleanup(ws, pod)

expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
restores: 1,
startFailures: 1,
stops: 1,
backups: 1,
Expand All @@ -176,6 +197,8 @@ var _ = Describe("WorkspaceController", func() {
m := collectMetricCounts(wsMetrics, ws)
pod := createWorkspaceExpectPod(ws)

markContentReady(ws)

By("adding Timeout condition")
updateObjWithRetries(k8sClient, ws, true, func(ws *workspacev1.Workspace) {
ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{
Expand All @@ -190,8 +213,9 @@ var _ = Describe("WorkspaceController", func() {
expectWorkspaceCleanup(ws, pod)

expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
stops: 1,
backups: 1,
restores: 1,
stops: 1,
backups: 1,
})
})

Expand All @@ -200,6 +224,8 @@ var _ = Describe("WorkspaceController", func() {
m := collectMetricCounts(wsMetrics, ws)
pod := createWorkspaceExpectPod(ws)

markContentReady(ws)

// Update Pod with stop and abort conditions.
updateObjWithRetries(k8sClient, ws, true, func(ws *workspacev1.Workspace) {
ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{
Expand All @@ -218,7 +244,8 @@ var _ = Describe("WorkspaceController", func() {
expectWorkspaceCleanup(ws, pod)

expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
stops: 1,
restores: 1,
stops: 1,
})
})

Expand All @@ -227,6 +254,8 @@ var _ = Describe("WorkspaceController", func() {
m := collectMetricCounts(wsMetrics, ws)
pod := createWorkspaceExpectPod(ws)

markContentReady(ws)

Expect(k8sClient.Delete(ctx, ws)).To(Succeed())

expectPhaseEventually(ws, workspacev1.WorkspacePhaseStopping)
Expand All @@ -236,8 +265,9 @@ var _ = Describe("WorkspaceController", func() {
expectWorkspaceCleanup(ws, pod)

expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{
stops: 1,
backups: 1,
restores: 1,
stops: 1,
backups: 1,
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion components/ws-manager-mk2/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (wsm *WorkspaceManagerServer) StartWorkspace(ctx context.Context, req *wsma
}

var wsr workspacev1.Workspace
err = wait.PollWithContext(ctx, 100*time.Millisecond, 5*time.Second, func(c context.Context) (done bool, err error) {
err = wait.PollWithContext(ctx, 100*time.Millisecond, 15*time.Second, func(c context.Context) (done bool, err error) {
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &wsr)
if err != nil {
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion dev/loadgen/cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var benchmarkCommand = &cobra.Command{
}

var load loadgen.LoadGenerator
load = loadgen.NewFixedLoadGenerator(500*time.Millisecond, 300*time.Millisecond)
load = loadgen.NewFixedLoadGenerator(800*time.Millisecond, 300*time.Millisecond)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly decreased the rate, this was creating workspaces too quickly for mk2, as mk2's StartWorkspace request doesn't block and was following the 2/second rate.

For mk1 loadtests, the StartWorkspace request takes seconds (to minutes) to complete, and never reached a rate of 2 starts/second anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly decreased the rate, this was creating workspaces too quickly for mk2, as mk2's StartWorkspace request doesn't block and was following the 2/second rate.

@WVerlaek were you hitting a rate limit of ws-manager-mk2, where it wasn't allowing additional gRPC connections? I assume yes, just curious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, some workspaces were failing to start, there were too many starting at once and pulling an image causing some to fail to pull. Increased the delay a bit to slow down workspace creation, but at this rate it's still faster than what mk1 would handle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I see, so it's just a natural breaking limit. Good to know!

What was failing on pull? registry-facade, containerd? Something else? Just curious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errors were failing to pull image from registry-facade due to IO timeout

load = loadgen.NewWorkspaceCountLimitingGenerator(load, scenario.Workspaces)

template := &api.StartWorkspaceRequest{
Expand Down
5 changes: 4 additions & 1 deletion dev/loadgen/pkg/loadgen/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,12 @@ func (w *WsmanExecutor) StopAll(ctx context.Context) error {
if err != nil {
log.Warnf("could not get workspaces: %v", err)
} else {
if len(resp.GetStatus()) == 0 {
n := len(resp.GetStatus())
if n == 0 {
break
}
ex := resp.GetStatus()[0]
log.Infof("%d workspaces remaining, e.g. %s", n, ex.Id)
Comment on lines +225 to +230
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some extra logging while stopping workspaces after a load test to view progress. Also include a workspace ID of a stopping workspace, to make it easy to inspect a workspace stuck in stopping

}

select {
Expand Down
1 change: 1 addition & 0 deletions install/installer/pkg/components/ws-daemon/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) {

wscontroller.Enabled = ucfg.Workspace.UseWsmanagerMk2
wscontroller.WorkingAreaSuffix = "-mk2"
wscontroller.MaxConcurrentReconciles = 15

return nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) {
Interrupted: util.Duration(5 * time.Minute),
},
//EventTraceLog: "", // todo(sje): make conditional based on config
ReconnectionInterval: util.Duration(30 * time.Second),
RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort),
WorkspaceCACertSecret: customCASecret,
TimeoutMaxConcurrentReconciles: 5,
ReconnectionInterval: util.Duration(30 * time.Second),
RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort),
WorkspaceCACertSecret: customCASecret,
WorkspaceMaxConcurrentReconciles: 15,
TimeoutMaxConcurrentReconciles: 15,
Comment on lines +227 to +228
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set both the timeout and workspace controller's max reconciles to 15. This number is slightly arbitrary but should be sufficient for us looking at the metrics during the loadgen. It's in config, so we can easily change it anyways

},
Content: struct {
Storage storageconfig.StorageConfig `json:"storage"`
Expand Down