Skip to content

[ws-manager-mk2] Rely on controller concurrency mechanism for content init and backup #16823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (ssc *SnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

snapshotURL, snapshotName, snapshotErr := ssc.operations.SnapshotIDs(snapshot.Spec.WorkspaceID)
snapshotURL, snapshotName, snapshotErr := ssc.operations.SnapshotIDs(ctx, snapshot.Spec.WorkspaceID)
if snapshotErr != nil {
return ctrl.Result{}, snapshotErr
}
Expand Down
42 changes: 29 additions & 13 deletions components/ws-daemon/pkg/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"google.golang.org/protobuf/proto"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -125,34 +126,53 @@ func (wsc *WorkspaceController) Reconcile(ctx context.Context, req ctrl.Request)
glog.WithField("workspaceID", workspace.Name).WithField("phase", workspace.Status.Phase).Debug("Reconcile workspace")

if workspace.Status.Phase == workspacev1.WorkspacePhaseCreating ||
workspace.Status.Phase == workspacev1.WorkspacePhaseInitializing ||
workspace.Status.Phase == workspacev1.WorkspacePhaseRunning {
workspace.Status.Phase == workspacev1.WorkspacePhaseInitializing {

result, err = wsc.handleWorkspaceInit(ctx, &workspace, req)
return result, err
}

if workspace.Status.Phase == workspacev1.WorkspacePhaseStopping {

result, err = wsc.handleWorkspaceStop(ctx, &workspace, req)
return result, err
}

return ctrl.Result{}, nil
}

// latestWorkspace checks if the we have the latest generation of the workspace CR. We do this because
// the cache could be stale and we retrieve a workspace CR that does not have the content init/backup
// conditions even though we have set them previously. This will lead to us performing these operations
// again. To prevent this we wait until we have the latest workspace CR.
func (wsc *WorkspaceController) latestWorkspace(ctx context.Context, ws *workspacev1.Workspace) error {
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionRefresh())

err := wsc.Client.Status().Update(ctx, ws)
if err != nil && !errors.IsConflict(err) {
glog.Warnf("could not refresh workspace: %v", err)
}

return err
}

func (wsc *WorkspaceController) handleWorkspaceInit(ctx context.Context, ws *workspacev1.Workspace, req ctrl.Request) (result ctrl.Result, err error) {
log := log.FromContext(ctx)
span, ctx := opentracing.StartSpanFromContext(ctx, "handleWorkspaceInit")
defer tracing.FinishSpan(span, &err)

if c := wsk8s.GetCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady)); c == nil {
if wsc.latestWorkspace(ctx, ws) != nil {
return ctrl.Result{Requeue: true}, nil
}

init, err := wsc.prepareInitializer(ctx, ws)
if err != nil {
return ctrl.Result{}, err
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. move the latestWorkspace check to here, just before we're attempting to start the content init

initStart := time.Now()
alreadyInit, failure, initErr := wsc.operations.InitWorkspaceContent(ctx, InitContentOptions{
failure, initErr := wsc.operations.InitWorkspaceContent(ctx, InitContentOptions{
Meta: WorkspaceMeta{
Owner: ws.Spec.Ownership.Owner,
WorkspaceId: ws.Spec.Ownership.WorkspaceID,
Expand All @@ -162,10 +182,6 @@ func (wsc *WorkspaceController) handleWorkspaceInit(ctx context.Context, ws *wor
Headless: ws.IsHeadless(),
})

if alreadyInit {
return ctrl.Result{}, nil
}

err = retry.RetryOnConflict(retryParams, func() error {
if err := wsc.Get(ctx, req.NamespacedName, ws); err != nil {
return err
Expand Down Expand Up @@ -219,13 +235,17 @@ func (wsc *WorkspaceController) handleWorkspaceStop(ctx context.Context, ws *wor
return ctrl.Result{}, nil
}

if wsc.latestWorkspace(ctx, ws) != nil {
return ctrl.Result{Requeue: true}, nil
}

disposeStart := time.Now()
var snapshotName string
var snapshotUrl string
if ws.Spec.Type == workspacev1.WorkspaceTypeRegular {
snapshotName = storage.DefaultBackup
} else {
snapshotUrl, snapshotName, err = wsc.operations.SnapshotIDs(ws.Name)
snapshotUrl, snapshotName, err = wsc.operations.SnapshotIDs(ctx, ws.Name)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -249,7 +269,7 @@ func (wsc *WorkspaceController) handleWorkspaceStop(ctx context.Context, ws *wor
}
}

alreadyDisposing, gitStatus, disposeErr := wsc.operations.DisposeWorkspace(ctx, DisposeOptions{
gitStatus, disposeErr := wsc.operations.DisposeWorkspace(ctx, DisposeOptions{
Meta: WorkspaceMeta{
Owner: ws.Spec.Ownership.Owner,
WorkspaceId: ws.Spec.Ownership.WorkspaceID,
Expand All @@ -261,10 +281,6 @@ func (wsc *WorkspaceController) handleWorkspaceStop(ctx context.Context, ws *wor
UpdateGitStatus: ws.Spec.Type == workspacev1.WorkspaceTypeRegular,
})

if alreadyDisposing {
return ctrl.Result{}, nil
}

err = retry.RetryOnConflict(retryParams, func() error {
if err := wsc.Get(ctx, req.NamespacedName, ws); err != nil {
return err
Expand Down
93 changes: 40 additions & 53 deletions components/ws-daemon/pkg/controller/workspace_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (

type WorkspaceOperations struct {
config content.Config
store *session.Store
provider *WorkspaceProvider
backupWorkspaceLimiter chan struct{}
metrics *content.Metrics
}
Expand All @@ -67,15 +67,15 @@ type DisposeOptions struct {
SnapshotName string
}

func NewWorkspaceOperations(config content.Config, store *session.Store, reg prometheus.Registerer) (*WorkspaceOperations, error) {
func NewWorkspaceOperations(config content.Config, provider *WorkspaceProvider, reg prometheus.Registerer) (*WorkspaceOperations, error) {
waitingTimeHist, waitingTimeoutCounter, err := content.RegisterConcurrentBackupMetrics(reg, "_mk2")
if err != nil {
return nil, err
}

return &WorkspaceOperations{
config: config,
store: store,
config: config,
provider: provider,
metrics: &content.Metrics{
BackupWaitingTimeHist: waitingTimeHist,
BackupWaitingTimeoutCounter: waitingTimeoutCounter,
Expand All @@ -85,34 +85,26 @@ func NewWorkspaceOperations(config content.Config, store *session.Store, reg pro
}, nil
}

func (wso *WorkspaceOperations) InitWorkspaceContent(ctx context.Context, options InitContentOptions) (bool, string, error) {
res, err := wso.store.NewWorkspace(
ctx, options.Meta.InstanceId, filepath.Join(wso.store.Location, options.Meta.InstanceId),
func (wso *WorkspaceOperations) InitWorkspaceContent(ctx context.Context, options InitContentOptions) (string, error) {
ws, err := wso.provider.Create(ctx, options.Meta.InstanceId, filepath.Join(wso.provider.Location, options.Meta.InstanceId),
wso.creator(options.Meta.Owner, options.Meta.WorkspaceId, options.Meta.InstanceId, options.Initializer, false))
if errors.Is(err, storage.ErrNotFound) {
return false, "", nil
}

if errors.Is(err, session.ErrAlreadyExists) {
return true, "", nil
}

if err != nil {
return false, "bug: cannot add workspace to store", xerrors.Errorf("cannot add workspace to store: %w", err)
return "bug: cannot add workspace to store", xerrors.Errorf("cannot add workspace to store: %w", err)
}

rs, ok := res.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
rs, ok := ws.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
if rs == nil || !ok {
return false, "bug: workspace has no remote storage", xerrors.Errorf("workspace has no remote storage")
return "bug: workspace has no remote storage", xerrors.Errorf("workspace has no remote storage")
}
ps, err := storage.NewPresignedAccess(&wso.config.Storage)
if err != nil {
return false, "bug: no presigned storage available", xerrors.Errorf("no presigned storage available: %w", err)
return "bug: no presigned storage available", xerrors.Errorf("no presigned storage available: %w", err)
}

remoteContent, err := content.CollectRemoteContent(ctx, rs, ps, options.Meta.Owner, options.Initializer)
if err != nil {
return false, "remote content error", xerrors.Errorf("remote content error: %w", err)
return "remote content error", xerrors.Errorf("remote content error: %w", err)
}

// Initialize workspace.
Expand All @@ -138,13 +130,18 @@ func (wso *WorkspaceOperations) InitWorkspaceContent(ctx context.Context, option
},
}

err = content.RunInitializer(ctx, res.Location, options.Initializer, remoteContent, opts)
err = content.RunInitializer(ctx, ws.Location, options.Initializer, remoteContent, opts)
if err != nil {
glog.Infof("error running initializer %v", err)
return false, err.Error(), err
return err.Error(), err
}

err = ws.Persist()
if err != nil {
return "cannot persist workspace", err
}

return false, "", nil
return "", nil
}

func (wso *WorkspaceOperations) creator(owner, workspaceId, instanceId string, init *csapi.WorkspaceInitializer, storageDisabled bool) session.WorkspaceFactory {
Expand All @@ -166,32 +163,22 @@ func (wso *WorkspaceOperations) creator(owner, workspaceId, instanceId string, i
FullWorkspaceBackup: false,
PersistentVolumeClaim: false,
RemoteStorageDisabled: storageDisabled,
IsMk2: true,

ServiceLocDaemon: filepath.Join(wso.config.WorkingArea, serviceDirName),
ServiceLocNode: filepath.Join(wso.config.WorkingAreaNode, serviceDirName),
}, nil
}
}

func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts DisposeOptions) (bool, *csapi.GitStatus, error) {
sess := wso.store.Get(opts.Meta.InstanceId)
if sess == nil {
return false, nil, fmt.Errorf("cannot find workspace %s during DisposeWorkspace", opts.Meta.InstanceId)
}

// Maybe there's someone else already trying to dispose the workspace.
// In that case we'll simply wait for that to happen.
done, repo, err := sess.WaitOrMarkForDisposal(ctx)
func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts DisposeOptions) (*csapi.GitStatus, error) {
ws, err := wso.provider.Get(ctx, opts.Meta.InstanceId)
if err != nil {
return false, nil, fmt.Errorf("failed to wait for workspace disposal of %s", opts.Meta.InstanceId)
}

if done {
return true, repo, nil
return nil, fmt.Errorf("cannot find workspace %s during DisposeWorkspace: %w", opts.Meta.InstanceId, err)
}

if sess.RemoteStorageDisabled {
return false, nil, xerrors.Errorf("workspace has no remote storage")
if ws.RemoteStorageDisabled {
return nil, fmt.Errorf("workspace has no remote storage")
}

if opts.BackupLogs {
Expand All @@ -202,14 +189,15 @@ func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts Dispo
}
}

err = wso.uploadWorkspaceContent(ctx, sess, opts.SnapshotName)
err = wso.uploadWorkspaceContent(ctx, ws, opts.SnapshotName)
if err != nil {
return false, nil, xerrors.Errorf("final backup failed for workspace %s", opts.Meta.InstanceId)
return nil, fmt.Errorf("final backup failed for workspace %s", opts.Meta.InstanceId)
}

var repo *csapi.GitStatus
if opts.UpdateGitStatus {
// Update the git status prior to deleting the workspace
repo, err = sess.UpdateGitStatus(ctx, false)
repo, err = ws.UpdateGitStatus(ctx, false)
if err != nil {
// do not fail workspace because we were unable to get git status
// which can happen for various reasons, including user corrupting his .git folder somehow
Expand All @@ -219,22 +207,22 @@ func (wso *WorkspaceOperations) DisposeWorkspace(ctx context.Context, opts Dispo
}
}

if err = sess.Dispose(ctx); err != nil {
if err = ws.Dispose(ctx, wso.provider.hooks[session.WorkspaceDisposed]); err != nil {
glog.WithError(err).Error("cannot dispose session")
}

// remove workspace daemon directory in the node
if err := os.RemoveAll(sess.ServiceLocDaemon); err != nil {
if err := os.RemoveAll(ws.ServiceLocDaemon); err != nil {
glog.WithError(err).Error("cannot delete workspace daemon directory")
}

return false, repo, nil
return repo, nil
}

func (wso *WorkspaceOperations) SnapshotIDs(workspaceID string) (snapshotUrl, snapshotName string, err error) {
sess := wso.store.Get(workspaceID)
if sess == nil {
return "", "", fmt.Errorf("cannot find workspace %s during SnapshotName", workspaceID)
func (wso *WorkspaceOperations) SnapshotIDs(ctx context.Context, workspaceID string) (snapshotUrl, snapshotName string, err error) {
sess, err := wso.provider.Get(ctx, workspaceID)
if err != nil {
return "", "", fmt.Errorf("cannot find workspace %s during SnapshotName: %w", workspaceID, err)
}

baseName := fmt.Sprintf("snapshot-%d", time.Now().UnixNano())
Expand All @@ -258,16 +246,16 @@ func (wso *WorkspaceOperations) TakeSnapshot(ctx context.Context, workspaceID, s
return fmt.Errorf("workspaceID is required")
}

sess := wso.store.Get(workspaceID)
if sess == nil {
ws, err := wso.provider.Get(ctx, workspaceID)
if err != nil {
return fmt.Errorf("cannot find workspace %s during DisposeWorkspace", workspaceID)
}

if sess.RemoteStorageDisabled {
if ws.RemoteStorageDisabled {
return fmt.Errorf("workspace has no remote storage")
}

err = wso.uploadWorkspaceContent(ctx, sess, snapshotName)
err = wso.uploadWorkspaceContent(ctx, ws, snapshotName)
if err != nil {
return fmt.Errorf("snapshot failed for workspace %s", workspaceID)
}
Expand Down Expand Up @@ -457,7 +445,6 @@ func (wsc *WorkspaceController) UpdateGitStatus(ctx context.Context, ws *workspa
return
}

glog.Infof("GIT LOCATION IS %v", loc)
loc = filepath.Join(loc, s.CheckoutLocation)
if !git.IsWorkingCopy(loc) {
glog.WithField("loc", loc).WithField("checkout location", s.CheckoutLocation).WithFields(s.OWI()).Debug("did not find a Git working copy - not updating Git status")
Expand Down
Loading