Skip to content

Commit c64c4f3

Browse files
authored
[ws-manager-mk2] Rely on controller concurrency mechanism for content init and backup (#16823)
* [wsman-mk2] Replace in memory state handling * [ws-daemon] Operate on latest workspace CR * [ws-daemon] Review comments
1 parent 27f243e commit c64c4f3

File tree

9 files changed

+215
-85
lines changed

9 files changed

+215
-85
lines changed

components/ws-daemon/pkg/controller/snapshot_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (ssc *SnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8989
return ctrl.Result{}, nil
9090
}
9191

92-
snapshotURL, snapshotName, snapshotErr := ssc.operations.SnapshotIDs(snapshot.Spec.WorkspaceID)
92+
snapshotURL, snapshotName, snapshotErr := ssc.operations.SnapshotIDs(ctx, snapshot.Spec.WorkspaceID)
9393
if snapshotErr != nil {
9494
return ctrl.Result{}, snapshotErr
9595
}

components/ws-daemon/pkg/controller/workspace_controller.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"google.golang.org/protobuf/proto"
2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/errors"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/apimachinery/pkg/util/wait"
@@ -125,34 +126,53 @@ func (wsc *WorkspaceController) Reconcile(ctx context.Context, req ctrl.Request)
125126
glog.WithField("workspaceID", workspace.Name).WithField("phase", workspace.Status.Phase).Debug("Reconcile workspace")
126127

127128
if workspace.Status.Phase == workspacev1.WorkspacePhaseCreating ||
128-
workspace.Status.Phase == workspacev1.WorkspacePhaseInitializing ||
129-
workspace.Status.Phase == workspacev1.WorkspacePhaseRunning {
129+
workspace.Status.Phase == workspacev1.WorkspacePhaseInitializing {
130130

131131
result, err = wsc.handleWorkspaceInit(ctx, &workspace, req)
132132
return result, err
133133
}
134134

135135
if workspace.Status.Phase == workspacev1.WorkspacePhaseStopping {
136+
136137
result, err = wsc.handleWorkspaceStop(ctx, &workspace, req)
137138
return result, err
138139
}
139140

140141
return ctrl.Result{}, nil
141142
}
142143

144+
// latestWorkspace checks if the we have the latest generation of the workspace CR. We do this because
145+
// the cache could be stale and we retrieve a workspace CR that does not have the content init/backup
146+
// conditions even though we have set them previously. This will lead to us performing these operations
147+
// again. To prevent this we wait until we have the latest workspace CR.
148+
func (wsc *WorkspaceController) latestWorkspace(ctx context.Context, ws *workspacev1.Workspace) error {
149+
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionRefresh())
150+
151+
err := wsc.Client.Status().Update(ctx, ws)
152+
if err != nil && !errors.IsConflict(err) {
153+
glog.Warnf("could not refresh workspace: %v", err)
154+
}
155+
156+
return err
157+
}
158+
143159
func (wsc *WorkspaceController) handleWorkspaceInit(ctx context.Context, ws *workspacev1.Workspace, req ctrl.Request) (result ctrl.Result, err error) {
144160
log := log.FromContext(ctx)
145161
span, ctx := opentracing.StartSpanFromContext(ctx, "handleWorkspaceInit")
146162
defer tracing.FinishSpan(span, &err)
147163

148164
if c := wsk8s.GetCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady)); c == nil {
165+
if wsc.latestWorkspace(ctx, ws) != nil {
166+
return ctrl.Result{Requeue: true}, nil
167+
}
168+
149169
init, err := wsc.prepareInitializer(ctx, ws)
150170
if err != nil {
151171
return ctrl.Result{}, err
152172
}
153173

154174
initStart := time.Now()
155-
alreadyInit, failure, initErr := wsc.operations.InitWorkspaceContent(ctx, InitContentOptions{
175+
failure, initErr := wsc.operations.InitWorkspaceContent(ctx, InitContentOptions{
156176
Meta: WorkspaceMeta{
157177
Owner: ws.Spec.Ownership.Owner,
158178
WorkspaceId: ws.Spec.Ownership.WorkspaceID,
@@ -162,10 +182,6 @@ func (wsc *WorkspaceController) handleWorkspaceInit(ctx context.Context, ws *wor
162182
Headless: ws.IsHeadless(),
163183
})
164184

165-
if alreadyInit {
166-
return ctrl.Result{}, nil
167-
}
168-
169185
err = retry.RetryOnConflict(retryParams, func() error {
170186
if err := wsc.Get(ctx, req.NamespacedName, ws); err != nil {
171187
return err
@@ -219,13 +235,17 @@ func (wsc *WorkspaceController) handleWorkspaceStop(ctx context.Context, ws *wor
219235
return ctrl.Result{}, nil
220236
}
221237

238+
if wsc.latestWorkspace(ctx, ws) != nil {
239+
return ctrl.Result{Requeue: true}, nil
240+
}
241+
222242
disposeStart := time.Now()
223243
var snapshotName string
224244
var snapshotUrl string
225245
if ws.Spec.Type == workspacev1.WorkspaceTypeRegular {
226246
snapshotName = storage.DefaultBackup
227247
} else {
228-
snapshotUrl, snapshotName, err = wsc.operations.SnapshotIDs(ws.Name)
248+
snapshotUrl, snapshotName, err = wsc.operations.SnapshotIDs(ctx, ws.Name)
229249
if err != nil {
230250
return ctrl.Result{}, err
231251
}
@@ -249,7 +269,7 @@ func (wsc *WorkspaceController) handleWorkspaceStop(ctx context.Context, ws *wor
249269
}
250270
}
251271

252-
alreadyDisposing, gitStatus, disposeErr := wsc.operations.DisposeWorkspace(ctx, DisposeOptions{
272+
gitStatus, disposeErr := wsc.operations.DisposeWorkspace(ctx, DisposeOptions{
253273
Meta: WorkspaceMeta{
254274
Owner: ws.Spec.Ownership.Owner,
255275
WorkspaceId: ws.Spec.Ownership.WorkspaceID,
@@ -261,10 +281,6 @@ func (wsc *WorkspaceController) handleWorkspaceStop(ctx context.Context, ws *wor
261281
UpdateGitStatus: ws.Spec.Type == workspacev1.WorkspaceTypeRegular,
262282
})
263283

264-
if alreadyDisposing {
265-
return ctrl.Result{}, nil
266-
}
267-
268284
err = retry.RetryOnConflict(retryParams, func() error {
269285
if err := wsc.Get(ctx, req.NamespacedName, ws); err != nil {
270286
return err

components/ws-daemon/pkg/controller/workspace_operations.go

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const (
4242

4343
type WorkspaceOperations struct {
4444
config content.Config
45-
store *session.Store
45+
provider *WorkspaceProvider
4646
backupWorkspaceLimiter chan struct{}
4747
metrics *content.Metrics
4848
}
@@ -67,15 +67,15 @@ type DisposeOptions struct {
6767
SnapshotName string
6868
}
6969

70-
func NewWorkspaceOperations(config content.Config, store *session.Store, reg prometheus.Registerer) (*WorkspaceOperations, error) {
70+
func NewWorkspaceOperations(config content.Config, provider *WorkspaceProvider, reg prometheus.Registerer) (*WorkspaceOperations, error) {
7171
waitingTimeHist, waitingTimeoutCounter, err := content.RegisterConcurrentBackupMetrics(reg, "_mk2")
7272
if err != nil {
7373
return nil, err
7474
}
7575

7676
return &WorkspaceOperations{
77-
config: config,
78-
store: store,
77+
config: config,
78+
provider: provider,
7979
metrics: &content.Metrics{
8080
BackupWaitingTimeHist: waitingTimeHist,
8181
BackupWaitingTimeoutCounter: waitingTimeoutCounter,
@@ -85,34 +85,26 @@ func NewWorkspaceOperations(config content.Config, store *session.Store, reg pro
8585
}, nil
8686
}
8787

88-
func (wso *WorkspaceOperations) InitWorkspaceContent(ctx context.Context, options InitContentOptions) (bool, string, error) {
89-
res, err := wso.store.NewWorkspace(
90-
ctx, options.Meta.InstanceId, filepath.Join(wso.store.Location, options.Meta.InstanceId),
88+
func (wso *WorkspaceOperations) InitWorkspaceContent(ctx context.Context, options InitContentOptions) (string, error) {
89+
ws, err := wso.provider.Create(ctx, options.Meta.InstanceId, filepath.Join(wso.provider.Location, options.Meta.InstanceId),
9190
wso.creator(options.Meta.Owner, options.Meta.WorkspaceId, options.Meta.InstanceId, options.Initializer, false))
92-
if errors.Is(err, storage.ErrNotFound) {
93-
return false, "", nil
94-
}
95-
96-
if errors.Is(err, session.ErrAlreadyExists) {
97-
return true, "", nil
98-
}
9991

10092
if err != nil {
101-
return false, "bug: cannot add workspace to store", xerrors.Errorf("cannot add workspace to store: %w", err)
93+
return "bug: cannot add workspace to store", xerrors.Errorf("cannot add workspace to store: %w", err)
10294
}
10395

104-
rs, ok := res.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
96+
rs, ok := ws.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
10597
if rs == nil || !ok {
106-
return false, "bug: workspace has no remote storage", xerrors.Errorf("workspace has no remote storage")
98+
return "bug: workspace has no remote storage", xerrors.Errorf("workspace has no remote storage")
10799
}
108100
ps, err := storage.NewPresignedAccess(&wso.config.Storage)
109101
if err != nil {
110-
return false, "bug: no presigned storage available", xerrors.Errorf("no presigned storage available: %w", err)
102+
return "bug: no presigned storage available", xerrors.Errorf("no presigned storage available: %w", err)
111103
}
112104

113105
remoteContent, err := content.CollectRemoteContent(ctx, rs, ps, options.Meta.Owner, options.Initializer)
114106
if err != nil {
115-
return false, "remote content error", xerrors.Errorf("remote content error: %w", err)
107+
return "remote content error", xerrors.Errorf("remote content error: %w", err)
116108
}
117109

118110
// Initialize workspace.
@@ -138,13 +130,18 @@ func (wso *WorkspaceOperations) InitWorkspaceContent(ctx context.Context, option
138130
},
139131
}
140132

141-
err = content.RunInitializer(ctx, res.Location, options.Initializer, remoteContent, opts)
133+
err = content.RunInitializer(ctx, ws.Location, options.Initializer, remoteContent, opts)
142134
if err != nil {
143135
glog.Infof("error running initializer %v", err)
144-
return false, err.Error(), err
136+
return err.Error(), err
137+
}
138+
139+
err = ws.Persist()
140+
if err != nil {
141+
return "cannot persist workspace", err
145142
}
146143

147-
return false, "", nil
144+
return "", nil
148145
}
149146

150147
func (wso *WorkspaceOperations) creator(owner, workspaceId, instanceId string, init *csapi.WorkspaceInitializer, storageDisabled bool) session.WorkspaceFactory {
@@ -166,32 +163,22 @@ func (wso *WorkspaceOperations) creator(owner, workspaceId, instanceId string, i
166163
FullWorkspaceBackup: false,
167164
PersistentVolumeClaim: false,
168165
RemoteStorageDisabled: storageDisabled,
166+
IsMk2: true,
169167

170168
ServiceLocDaemon: filepath.Join(wso.config.WorkingArea, serviceDirName),
171169
ServiceLocNode: filepath.Join(wso.config.WorkingAreaNode, serviceDirName),
172170
}, nil
173171
}
174172
}
175173

176-
func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts DisposeOptions) (bool, *csapi.GitStatus, error) {
177-
sess := wso.store.Get(opts.Meta.InstanceId)
178-
if sess == nil {
179-
return false, nil, fmt.Errorf("cannot find workspace %s during DisposeWorkspace", opts.Meta.InstanceId)
180-
}
181-
182-
// Maybe there's someone else already trying to dispose the workspace.
183-
// In that case we'll simply wait for that to happen.
184-
done, repo, err := sess.WaitOrMarkForDisposal(ctx)
174+
func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts DisposeOptions) (*csapi.GitStatus, error) {
175+
ws, err := wso.provider.Get(ctx, opts.Meta.InstanceId)
185176
if err != nil {
186-
return false, nil, fmt.Errorf("failed to wait for workspace disposal of %s", opts.Meta.InstanceId)
187-
}
188-
189-
if done {
190-
return true, repo, nil
177+
return nil, fmt.Errorf("cannot find workspace %s during DisposeWorkspace: %w", opts.Meta.InstanceId, err)
191178
}
192179

193-
if sess.RemoteStorageDisabled {
194-
return false, nil, xerrors.Errorf("workspace has no remote storage")
180+
if ws.RemoteStorageDisabled {
181+
return nil, fmt.Errorf("workspace has no remote storage")
195182
}
196183

197184
if opts.BackupLogs {
@@ -202,14 +189,15 @@ func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts Dispo
202189
}
203190
}
204191

205-
err = wso.uploadWorkspaceContent(ctx, sess, opts.SnapshotName)
192+
err = wso.uploadWorkspaceContent(ctx, ws, opts.SnapshotName)
206193
if err != nil {
207-
return false, nil, xerrors.Errorf("final backup failed for workspace %s", opts.Meta.InstanceId)
194+
return nil, fmt.Errorf("final backup failed for workspace %s", opts.Meta.InstanceId)
208195
}
209196

197+
var repo *csapi.GitStatus
210198
if opts.UpdateGitStatus {
211199
// Update the git status prior to deleting the workspace
212-
repo, err = sess.UpdateGitStatus(ctx, false)
200+
repo, err = ws.UpdateGitStatus(ctx, false)
213201
if err != nil {
214202
// do not fail workspace because we were unable to get git status
215203
// which can happen for various reasons, including user corrupting his .git folder somehow
@@ -219,22 +207,22 @@ func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts Dispo
219207
}
220208
}
221209

222-
if err = sess.Dispose(ctx); err != nil {
210+
if err = ws.Dispose(ctx, wso.provider.hooks[session.WorkspaceDisposed]); err != nil {
223211
glog.WithError(err).Error("cannot dispose session")
224212
}
225213

226214
// remove workspace daemon directory in the node
227-
if err := os.RemoveAll(sess.ServiceLocDaemon); err != nil {
215+
if err := os.RemoveAll(ws.ServiceLocDaemon); err != nil {
228216
glog.WithError(err).Error("cannot delete workspace daemon directory")
229217
}
230218

231-
return false, repo, nil
219+
return repo, nil
232220
}
233221

234-
func (wso *WorkspaceOperations) SnapshotIDs(workspaceID string) (snapshotUrl, snapshotName string, err error) {
235-
sess := wso.store.Get(workspaceID)
236-
if sess == nil {
237-
return "", "", fmt.Errorf("cannot find workspace %s during SnapshotName", workspaceID)
222+
func (wso *WorkspaceOperations) SnapshotIDs(ctx context.Context, workspaceID string) (snapshotUrl, snapshotName string, err error) {
223+
sess, err := wso.provider.Get(ctx, workspaceID)
224+
if err != nil {
225+
return "", "", fmt.Errorf("cannot find workspace %s during SnapshotName: %w", workspaceID, err)
238226
}
239227

240228
baseName := fmt.Sprintf("snapshot-%d", time.Now().UnixNano())
@@ -258,16 +246,16 @@ func (wso *WorkspaceOperations) TakeSnapshot(ctx context.Context, workspaceID, s
258246
return fmt.Errorf("workspaceID is required")
259247
}
260248

261-
sess := wso.store.Get(workspaceID)
262-
if sess == nil {
249+
ws, err := wso.provider.Get(ctx, workspaceID)
250+
if err != nil {
263251
return fmt.Errorf("cannot find workspace %s during DisposeWorkspace", workspaceID)
264252
}
265253

266-
if sess.RemoteStorageDisabled {
254+
if ws.RemoteStorageDisabled {
267255
return fmt.Errorf("workspace has no remote storage")
268256
}
269257

270-
err = wso.uploadWorkspaceContent(ctx, sess, snapshotName)
258+
err = wso.uploadWorkspaceContent(ctx, ws, snapshotName)
271259
if err != nil {
272260
return fmt.Errorf("snapshot failed for workspace %s", workspaceID)
273261
}
@@ -457,7 +445,6 @@ func (wsc *WorkspaceController) UpdateGitStatus(ctx context.Context, ws *workspa
457445
return
458446
}
459447

460-
glog.Infof("GIT LOCATION IS %v", loc)
461448
loc = filepath.Join(loc, s.CheckoutLocation)
462449
if !git.IsWorkingCopy(loc) {
463450
glog.WithField("loc", loc).WithField("checkout location", s.CheckoutLocation).WithFields(s.OWI()).Debug("did not find a Git working copy - not updating Git status")

0 commit comments

Comments
 (0)