Skip to content

Commit 22f6b3e

Browse files
committed
Parallel layer upload for s3 cache
Signed-off-by: Bertrand Paquet <[email protected]>
1 parent 11a4a07 commit 22f6b3e

File tree

2 files changed

+129
-102
lines changed

2 files changed

+129
-102
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ Other options are:
578578
* Multiple manifest names can be specified at the same time, separated by `;`. The standard use case is to use the git sha1 as name, and the branch name as duplicate, and load both with 2 `import-cache` commands.
579579
* `ignore-error=<false|true>`: specify if error is ignored in case cache export fails (default: `false`)
580580
* `touch_refresh=24h`: Instead of being uploaded again when not changed, blobs files will be "touched" on s3 every `touch_refresh`, default is 24h. Due to this, an expiration policy can be set on the S3 bucket to cleanup useless files automatically. Manifests files are systematically rewritten, there is no need to touch them.
581+
* `upload_parallelism=4`: This parameter changes the number of layers uploaded to s3 in parallel. Each individual layer is uploaded with 5 threads, using the Upload manager provided by the AWS SDK.
581582

582583
`--import-cache` options:
583584
* `type=s3`

cache/remotecache/s3/s3.go

Lines changed: 128 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -30,37 +30,40 @@ import (
3030
digest "github.com/opencontainers/go-digest"
3131
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
3232
"github.com/pkg/errors"
33+
"golang.org/x/sync/errgroup"
3334
)
3435

3536
const (
36-
attrBucket = "bucket"
37-
attrRegion = "region"
38-
attrPrefix = "prefix"
39-
attrManifestsPrefix = "manifests_prefix"
40-
attrBlobsPrefix = "blobs_prefix"
41-
attrName = "name"
42-
attrTouchRefresh = "touch_refresh"
43-
attrEndpointURL = "endpoint_url"
44-
attrAccessKeyID = "access_key_id"
45-
attrSecretAccessKey = "secret_access_key"
46-
attrSessionToken = "session_token"
47-
attrUsePathStyle = "use_path_style"
48-
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
37+
attrBucket = "bucket"
38+
attrRegion = "region"
39+
attrPrefix = "prefix"
40+
attrManifestsPrefix = "manifests_prefix"
41+
attrBlobsPrefix = "blobs_prefix"
42+
attrName = "name"
43+
attrTouchRefresh = "touch_refresh"
44+
attrEndpointURL = "endpoint_url"
45+
attrAccessKeyID = "access_key_id"
46+
attrSecretAccessKey = "secret_access_key"
47+
attrSessionToken = "session_token"
48+
attrUsePathStyle = "use_path_style"
49+
attrUploadParallelism = "upload_parallelism"
50+
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
4951
)
5052

5153
type Config struct {
52-
Bucket string
53-
Region string
54-
Prefix string
55-
ManifestsPrefix string
56-
BlobsPrefix string
57-
Names []string
58-
TouchRefresh time.Duration
59-
EndpointURL string
60-
AccessKeyID string
61-
SecretAccessKey string
62-
SessionToken string
63-
UsePathStyle bool
54+
Bucket string
55+
Region string
56+
Prefix string
57+
ManifestsPrefix string
58+
BlobsPrefix string
59+
Names []string
60+
TouchRefresh time.Duration
61+
EndpointURL string
62+
AccessKeyID string
63+
SecretAccessKey string
64+
SessionToken string
65+
UsePathStyle bool
66+
UploadParallelism int
6467
}
6568

6669
func getConfig(attrs map[string]string) (Config, error) {
@@ -125,19 +128,33 @@ func getConfig(attrs map[string]string) (Config, error) {
125128
}
126129
}
127130

131+
uploadParallelism := 4
132+
uploadParallelismStr, ok := attrs[attrUploadParallelism]
133+
if ok {
134+
uploadParallelismInt, err := strconv.Atoi(uploadParallelismStr)
135+
if err != nil {
136+
return Config{}, errors.Errorf("upload_parallelism must be a positive integer")
137+
}
138+
if uploadParallelismInt <= 0 {
139+
return Config{}, errors.Errorf("upload_parallelism must be a positive integer")
140+
}
141+
uploadParallelism = uploadParallelismInt
142+
}
143+
128144
return Config{
129-
Bucket: bucket,
130-
Region: region,
131-
Prefix: prefix,
132-
ManifestsPrefix: manifestsPrefix,
133-
BlobsPrefix: blobsPrefix,
134-
Names: names,
135-
TouchRefresh: touchRefresh,
136-
EndpointURL: endpointURL,
137-
AccessKeyID: accessKeyID,
138-
SecretAccessKey: secretAccessKey,
139-
SessionToken: sessionToken,
140-
UsePathStyle: usePathStyle,
145+
Bucket: bucket,
146+
Region: region,
147+
Prefix: prefix,
148+
ManifestsPrefix: manifestsPrefix,
149+
BlobsPrefix: blobsPrefix,
150+
Names: names,
151+
TouchRefresh: touchRefresh,
152+
EndpointURL: endpointURL,
153+
AccessKeyID: accessKeyID,
154+
SecretAccessKey: secretAccessKey,
155+
SessionToken: sessionToken,
156+
UsePathStyle: usePathStyle,
157+
UploadParallelism: uploadParallelism,
141158
}, nil
142159
}
143160

@@ -187,64 +204,84 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
187204
return nil, err
188205
}
189206

190-
for i, l := range cacheConfig.Layers {
191-
dgstPair, ok := descs[l.Blob]
192-
if !ok {
193-
return nil, errors.Errorf("missing blob %s", l.Blob)
194-
}
195-
if dgstPair.Descriptor.Annotations == nil {
196-
return nil, errors.Errorf("invalid descriptor without annotations")
197-
}
198-
v, ok := dgstPair.Descriptor.Annotations[labels.LabelUncompressed]
199-
if !ok {
200-
return nil, errors.Errorf("invalid descriptor without uncompressed annotation")
201-
}
202-
diffID, err := digest.Parse(v)
203-
if err != nil {
204-
return nil, errors.Wrapf(err, "failed to parse uncompressed annotation")
205-
}
207+
eg, groupCtx := errgroup.WithContext(ctx)
208+
tasks := make(chan int, e.config.UploadParallelism)
206209

207-
key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
208-
exists, size, err := e.s3Client.exists(ctx, key)
209-
if err != nil {
210-
return nil, errors.Wrapf(err, "failed to check file presence in cache")
210+
go func() {
211+
for i := range cacheConfig.Layers {
212+
tasks <- i
211213
}
212-
if exists != nil {
213-
if time.Since(*exists) > e.config.TouchRefresh {
214-
err = e.s3Client.touch(ctx, key, size)
214+
close(tasks)
215+
}()
216+
217+
for k := 0; k < e.config.UploadParallelism; k++ {
218+
eg.Go(func() error {
219+
for index := range tasks {
220+
blob := cacheConfig.Layers[index].Blob
221+
dgstPair, ok := descs[blob]
222+
if !ok {
223+
return errors.Errorf("missing blob %s", blob)
224+
}
225+
if dgstPair.Descriptor.Annotations == nil {
226+
return errors.Errorf("invalid descriptor without annotations")
227+
}
228+
v, ok := dgstPair.Descriptor.Annotations[labels.LabelUncompressed]
229+
if !ok {
230+
return errors.Errorf("invalid descriptor without uncompressed annotation")
231+
}
232+
diffID, err := digest.Parse(v)
215233
if err != nil {
216-
return nil, errors.Wrapf(err, "failed to touch file")
234+
return errors.Wrapf(err, "failed to parse uncompressed annotation")
217235
}
218-
}
219-
} else {
220-
layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s", l.Blob))
221-
// TODO: once buildkit uses v2, start using
222-
// https://github.com/containerd/containerd/pull/9657
223-
// currently inline data should never happen.
224-
ra, err := dgstPair.Provider.ReaderAt(ctx, dgstPair.Descriptor)
225-
if err != nil {
226-
return nil, layerDone(errors.Wrap(err, "error reading layer blob from provider"))
227-
}
228-
defer ra.Close()
229-
if err := e.s3Client.saveMutableAt(ctx, key, &nopCloserSectionReader{io.NewSectionReader(ra, 0, ra.Size())}); err != nil {
230-
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
231-
}
232-
layerDone(nil)
233-
}
234236

235-
la := &v1.LayerAnnotations{
236-
DiffID: diffID,
237-
Size: dgstPair.Descriptor.Size,
238-
MediaType: dgstPair.Descriptor.MediaType,
239-
}
240-
if v, ok := dgstPair.Descriptor.Annotations["buildkit/createdat"]; ok {
241-
var t time.Time
242-
if err := (&t).UnmarshalText([]byte(v)); err != nil {
243-
return nil, err
237+
key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
238+
exists, size, err := e.s3Client.exists(groupCtx, key)
239+
if err != nil {
240+
return errors.Wrapf(err, "failed to check file presence in cache")
241+
}
242+
if exists != nil {
243+
if time.Since(*exists) > e.config.TouchRefresh {
244+
err = e.s3Client.touch(groupCtx, key, size)
245+
if err != nil {
246+
return errors.Wrapf(err, "failed to touch file")
247+
}
248+
}
249+
} else {
250+
layerDone := progress.OneOff(groupCtx, fmt.Sprintf("writing layer %s", blob))
251+
// TODO: once buildkit uses v2, start using
252+
// https://github.com/containerd/containerd/pull/9657
253+
// currently inline data should never happen.
254+
ra, err := dgstPair.Provider.ReaderAt(groupCtx, dgstPair.Descriptor)
255+
if err != nil {
256+
return layerDone(errors.Wrap(err, "error reading layer blob from provider"))
257+
}
258+
defer ra.Close()
259+
if err := e.s3Client.saveMutableAt(groupCtx, key, &nopCloserSectionReader{io.NewSectionReader(ra, 0, ra.Size())}); err != nil {
260+
return layerDone(errors.Wrap(err, "error writing layer blob"))
261+
}
262+
layerDone(nil)
263+
}
264+
265+
la := &v1.LayerAnnotations{
266+
DiffID: diffID,
267+
Size: dgstPair.Descriptor.Size,
268+
MediaType: dgstPair.Descriptor.MediaType,
269+
}
270+
if v, ok := dgstPair.Descriptor.Annotations["buildkit/createdat"]; ok {
271+
var t time.Time
272+
if err := (&t).UnmarshalText([]byte(v)); err != nil {
273+
return err
274+
}
275+
la.CreatedAt = t.UTC()
276+
}
277+
cacheConfig.Layers[index].Annotations = la
244278
}
245-
la.CreatedAt = t.UTC()
246-
}
247-
cacheConfig.Layers[i].Annotations = la
279+
return nil
280+
})
281+
}
282+
283+
if err := eg.Wait(); err != nil {
284+
return nil, err
248285
}
249286

250287
dt, err := json.Marshal(cacheConfig)
@@ -253,7 +290,7 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
253290
}
254291

255292
for _, name := range e.config.Names {
256-
if err := e.s3Client.saveMutable(ctx, e.s3Client.manifestKey(name), dt); err != nil {
293+
if err := e.s3Client.saveMutableAt(ctx, e.s3Client.manifestKey(name), bytes.NewReader(dt)); err != nil {
257294
return nil, errors.Wrapf(err, "error writing manifest: %s", name)
258295
}
259296
}
@@ -430,18 +467,7 @@ func (s3Client *s3Client) getReader(ctx context.Context, key string) (io.ReadClo
430467
return output.Body, nil
431468
}
432469

433-
func (s3Client *s3Client) saveMutable(ctx context.Context, key string, value []byte) error {
434-
input := &s3.PutObjectInput{
435-
Bucket: &s3Client.bucket,
436-
Key: &key,
437-
438-
Body: bytes.NewReader(value),
439-
}
440-
_, err := s3Client.Upload(ctx, input)
441-
return err
442-
}
443-
444-
func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.ReadSeekCloser) error {
470+
func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.Reader) error {
445471
input := &s3.PutObjectInput{
446472
Bucket: &s3Client.bucket,
447473
Key: &key,

0 commit comments

Comments
 (0)