Skip to content

Commit 1a0dabd

Browse files
authored
Merge pull request #1285 from k8s-infra-cherrypick-robot/cherry-pick-1278-to-release-1.24
[release-1.24] fix: fix azcopy exec timeout func
2 parents 1ba319e + 2e2ad9c commit 1a0dabd

File tree

5 files changed

+112
-38
lines changed

5 files changed

+112
-38
lines changed

pkg/blob/controllerserver.go

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ const (
6161
MSI = "MSI"
6262
SPN = "SPN"
6363
authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64-
65-
waitForAzCopyInterval = 2 * time.Second
6664
)
6765

6866
// CreateVolume provisions a volume
@@ -85,7 +83,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
8583
// logging the job status if it's volume cloning
8684
if req.GetVolumeContentSource() != nil {
8785
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
88-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
86+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
8987
}
9088
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
9189
}
@@ -759,43 +757,41 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
759757
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
760758
}
761759

762-
timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute)
763-
timeTick := time.Tick(waitForAzCopyInterval)
764760
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
765761
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
766762

767763
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
768764
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
769-
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
765+
switch jobState {
766+
case util.AzcopyJobError, util.AzcopyJobCompleted:
770767
return err
771-
}
772-
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
773-
for {
774-
select {
775-
case <-timeTick:
776-
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
777-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
778-
switch jobState {
779-
case util.AzcopyJobError, util.AzcopyJobCompleted:
780-
return err
781-
case util.AzcopyJobNotFound:
782-
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
783-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
784-
if len(authAzcopyEnv) > 0 {
785-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
786-
}
787-
out, copyErr := cmd.CombinedOutput()
788-
if copyErr != nil {
789-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
790-
} else {
791-
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
792-
}
793-
return copyErr
768+
case util.AzcopyJobRunning:
769+
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
770+
case util.AzcopyJobNotFound:
771+
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
772+
execFunc := func() error {
773+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
774+
if len(authAzcopyEnv) > 0 {
775+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
776+
}
777+
if out, err := cmd.CombinedOutput(); err != nil {
778+
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
794779
}
795-
case <-timeAfter:
796-
return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
780+
return nil
781+
}
782+
timeoutFunc := func() error {
783+
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
784+
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
785+
}
786+
copyErr := util.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
787+
if copyErr != nil {
788+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
789+
} else {
790+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
797791
}
792+
return copyErr
798793
}
794+
return err
799795
}
800796

801797
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now

pkg/blob/controllerserver_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,7 +1712,7 @@ func TestCopyVolume(t *testing.T) {
17121712
},
17131713
},
17141714
{
1715-
name: "azcopy job is first in progress and then be completed",
1715+
name: "azcopy job is in progress",
17161716
testFunc: func(t *testing.T) {
17171717
d := NewFakeDriver()
17181718
mp := map[string]string{}
@@ -1739,15 +1739,12 @@ func TestCopyVolume(t *testing.T) {
17391739

17401740
m := util.NewMockEXEC(ctrl)
17411741
listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
1742-
listStr2 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
1743-
o1 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
1742+
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
17441743
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil)
1745-
o2 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr2, nil)
1746-
gomock.InOrder(o1, o2)
17471744

17481745
d.azcopy.ExecCmd = m
17491746

1750-
var expectedErr error
1747+
expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%")
17511748
err := d.copyVolume(req, "sastoken", nil, "dstContainer", "core.windows.net")
17521749
if !reflect.DeepEqual(err, expectedErr) {
17531750
t.Errorf("Unexpected error: %v", err)

pkg/blob/volume_lock.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import (
2323
)
2424

2525
const (
26-
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
26+
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v"
2728
)
2829

2930
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs

pkg/util/util.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strconv"
2525
"strings"
2626
"sync"
27+
"time"
2728

2829
"github.com/go-ini/ini"
2930
"github.com/pkg/errors"
@@ -387,3 +388,30 @@ func SetVolumeOwnership(path, gid, policy string) error {
387388
}
388389
return volume.SetVolumeOwnership(&VolumeMounter{path: path}, path, &gidInt64, &fsGroupChangePolicy, nil)
389390
}
391+
392+
// ExecFunc returns a exec function's output and error
393+
type ExecFunc func() (err error)
394+
395+
// TimeoutFunc returns output and error if an ExecFunc timeout
396+
type TimeoutFunc func() (err error)
397+
398+
// WaitForExecCompletion waits for the exec function to complete or return timeout error
399+
func WaitForExecCompletion(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error {
400+
// Create a channel to receive the result of the azcopy exec function
401+
done := make(chan bool)
402+
var err error
403+
404+
// Start the azcopy exec function in a goroutine
405+
go func() {
406+
err = execFunc()
407+
done <- true
408+
}()
409+
410+
// Wait for the function to complete or time out
411+
select {
412+
case <-done:
413+
return err
414+
case <-time.After(timeout):
415+
return timeoutFunc()
416+
}
417+
}

pkg/util/util_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,3 +656,55 @@ func TestSetVolumeOwnership(t *testing.T) {
656656
}
657657
}
658658
}
659+
660+
func TestWaitForExecCompletion(t *testing.T) {
661+
tests := []struct {
662+
desc string
663+
timeout time.Duration
664+
execFunc ExecFunc
665+
timeoutFunc TimeoutFunc
666+
expectedErr error
667+
}{
668+
{
669+
desc: "execFunc returns error",
670+
timeout: 1 * time.Second,
671+
execFunc: func() error {
672+
return fmt.Errorf("execFunc error")
673+
},
674+
timeoutFunc: func() error {
675+
return fmt.Errorf("timeout error")
676+
},
677+
expectedErr: fmt.Errorf("execFunc error"),
678+
},
679+
{
680+
desc: "execFunc timeout",
681+
timeout: 1 * time.Second,
682+
execFunc: func() error {
683+
time.Sleep(2 * time.Second)
684+
return nil
685+
},
686+
timeoutFunc: func() error {
687+
return fmt.Errorf("timeout error")
688+
},
689+
expectedErr: fmt.Errorf("timeout error"),
690+
},
691+
{
692+
desc: "execFunc completed successfully",
693+
timeout: 1 * time.Second,
694+
execFunc: func() error {
695+
return nil
696+
},
697+
timeoutFunc: func() error {
698+
return fmt.Errorf("timeout error")
699+
},
700+
expectedErr: nil,
701+
},
702+
}
703+
704+
for _, test := range tests {
705+
err := WaitForExecCompletion(test.timeout, test.execFunc, test.timeoutFunc)
706+
if err != nil && (err.Error() != test.expectedErr.Error()) {
707+
t.Errorf("unexpected error: %v, expected error: %v", err, test.expectedErr)
708+
}
709+
}
710+
}

0 commit comments

Comments
 (0)