Skip to content

Commit aae9d91

Browse files
committed
Optimise OCI artifacts reconciliation
- Fetch the upstream digest before validation and pulling - Pull artifact only if the upstream digest is different from the one in storage - Add the image tag to the revision string `<tag>/<digest-hex>` for a better UX - Extract the layer processing to a dedicated function Signed-off-by: Stefan Prodan <[email protected]>
1 parent 4ec51ca commit aae9d91

File tree

2 files changed

+167
-148
lines changed

2 files changed

+167
-148
lines changed

controllers/ocirepository_controller.go

Lines changed: 149 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -369,47 +369,18 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
369369
return sreconcile.ResultEmpty, e
370370
}
371371

372-
// Pull artifact from the remote container registry
373-
img, err := crane.Pull(url, options...)
374-
if err != nil {
375-
e := serror.NewGeneric(
376-
fmt.Errorf("failed to pull artifact from '%s': %w", obj.Spec.URL, err),
377-
sourcev1.OCIPullFailedReason,
378-
)
379-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
380-
return sreconcile.ResultEmpty, e
381-
}
382-
383-
// Determine the artifact SHA256 digest
384-
imgDigest, err := img.Digest()
372+
// Get the upstream revision from the artifact digest
373+
revision, err := r.getRevision(url, options)
385374
if err != nil {
386375
e := serror.NewGeneric(
387376
fmt.Errorf("failed to determine artifact digest: %w", err),
388-
sourcev1.OCILayerOperationFailedReason,
389-
)
390-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
391-
return sreconcile.ResultEmpty, e
392-
}
393-
394-
// Set the internal revision to the remote digest hex
395-
revision := imgDigest.Hex
396-
397-
// Copy the OCI annotations to the internal artifact metadata
398-
manifest, err := img.Manifest()
399-
if err != nil {
400-
e := serror.NewGeneric(
401-
fmt.Errorf("failed to parse artifact manifest: %w", err),
402-
sourcev1.OCILayerOperationFailedReason,
377+
sourcev1.OCIPullFailedReason,
403378
)
404379
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
405380
return sreconcile.ResultEmpty, e
406381
}
407-
408-
m := &sourcev1.Artifact{
409-
Revision: revision,
410-
Metadata: manifest.Annotations,
411-
}
412-
m.DeepCopyInto(metadata)
382+
metaArtifact := &sourcev1.Artifact{Revision: revision}
383+
metaArtifact.DeepCopyInto(metadata)
413384

414385
// Mark observations about the revision on the object
415386
defer func() {
@@ -430,7 +401,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
430401
} else if !obj.GetArtifact().HasRevision(revision) ||
431402
conditions.GetObservedGeneration(obj, sourcev1.SourceVerifiedCondition) != obj.Generation ||
432403
conditions.IsFalse(obj, sourcev1.SourceVerifiedCondition) {
433-
err := r.verifyOCISourceSignature(ctx, obj, url, keychain)
404+
err := r.verifySignature(ctx, obj, url, keychain)
434405
if err != nil {
435406
provider := obj.Spec.Verify.Provider
436407
if obj.Spec.Verify.SecretRef == nil {
@@ -447,121 +418,173 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
447418
conditions.MarkTrue(obj, sourcev1.SourceVerifiedCondition, meta.SucceededReason, "verified signature of digest %s", revision)
448419
}
449420

450-
// Extract the content of the first artifact layer
451-
if !obj.GetArtifact().HasRevision(revision) {
452-
layers, err := img.Layers()
453-
if err != nil {
421+
// Skip pulling if the artifact revision hasn't changes
422+
if obj.GetArtifact().HasRevision(revision) {
423+
conditions.Delete(obj, sourcev1.FetchFailedCondition)
424+
return sreconcile.ResultSuccess, nil
425+
}
426+
427+
// Pull artifact from the remote container registry
428+
img, err := crane.Pull(url, options...)
429+
if err != nil {
430+
e := serror.NewGeneric(
431+
fmt.Errorf("failed to pull artifact from '%s': %w", obj.Spec.URL, err),
432+
sourcev1.OCIPullFailedReason,
433+
)
434+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
435+
return sreconcile.ResultEmpty, e
436+
}
437+
438+
// Copy the OCI annotations to the internal artifact metadata
439+
manifest, err := img.Manifest()
440+
if err != nil {
441+
e := serror.NewGeneric(
442+
fmt.Errorf("failed to parse artifact manifest: %w", err),
443+
sourcev1.OCILayerOperationFailedReason,
444+
)
445+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
446+
return sreconcile.ResultEmpty, e
447+
}
448+
metadata.Metadata = manifest.Annotations
449+
450+
// Extract the compressed content from the selected layer
451+
blob, err := r.getLayerCompressed(obj, img)
452+
if err != nil {
453+
e := serror.NewGeneric(err, sourcev1.OCILayerOperationFailedReason)
454+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
455+
return sreconcile.ResultEmpty, e
456+
}
457+
458+
// Persist layer content to storage using the specified operation
459+
switch obj.GetLayerOperation() {
460+
case sourcev1.OCILayerExtract:
461+
if _, err = untar.Untar(blob, dir); err != nil {
454462
e := serror.NewGeneric(
455-
fmt.Errorf("failed to parse artifact layers: %w", err),
463+
fmt.Errorf("failed to extract layer contents from artifact: %w", err),
456464
sourcev1.OCILayerOperationFailedReason,
457465
)
458466
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
459467
return sreconcile.ResultEmpty, e
460468
}
461-
462-
if len(layers) < 1 {
469+
case sourcev1.OCILayerCopy:
470+
metadata.Path = fmt.Sprintf("%s.tgz", r.digestFromRevision(metadata.Revision))
471+
file, err := os.Create(filepath.Join(dir, metadata.Path))
472+
if err != nil {
463473
e := serror.NewGeneric(
464-
fmt.Errorf("no layers found in artifact"),
474+
fmt.Errorf("failed to create file to copy layer to: %w", err),
465475
sourcev1.OCILayerOperationFailedReason,
466476
)
467477
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
468478
return sreconcile.ResultEmpty, e
469479
}
480+
defer file.Close()
470481

471-
var layer gcrv1.Layer
472-
473-
switch {
474-
case obj.GetLayerMediaType() != "":
475-
var found bool
476-
for i, l := range layers {
477-
md, err := l.MediaType()
478-
if err != nil {
479-
e := serror.NewGeneric(
480-
fmt.Errorf("failed to determine the media type of layer[%v] from artifact: %w", i, err),
481-
sourcev1.OCILayerOperationFailedReason,
482-
)
483-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
484-
return sreconcile.ResultEmpty, e
485-
}
486-
if string(md) == obj.GetLayerMediaType() {
487-
layer = layers[i]
488-
found = true
489-
break
490-
}
491-
}
492-
if !found {
493-
e := serror.NewGeneric(
494-
fmt.Errorf("failed to find layer with media type '%s' in artifact", obj.GetLayerMediaType()),
495-
sourcev1.OCILayerOperationFailedReason,
496-
)
497-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
498-
return sreconcile.ResultEmpty, e
499-
}
500-
default:
501-
layer = layers[0]
502-
}
503-
504-
// Extract the compressed content from the selected layer
505-
blob, err := layer.Compressed()
482+
_, err = io.Copy(file, blob)
506483
if err != nil {
507484
e := serror.NewGeneric(
508-
fmt.Errorf("failed to extract the first layer from artifact: %w", err),
485+
fmt.Errorf("failed to copy layer from artifact: %w", err),
509486
sourcev1.OCILayerOperationFailedReason,
510487
)
511488
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
512489
return sreconcile.ResultEmpty, e
513490
}
491+
default:
492+
e := serror.NewGeneric(
493+
fmt.Errorf("unsupported layer operation: %s", obj.GetLayerOperation()),
494+
sourcev1.OCILayerOperationFailedReason,
495+
)
496+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
497+
return sreconcile.ResultEmpty, e
498+
}
514499

515-
// Persist layer content to storage using the specified operation
516-
switch obj.GetLayerOperation() {
517-
case sourcev1.OCILayerExtract:
518-
if _, err = untar.Untar(blob, dir); err != nil {
519-
e := serror.NewGeneric(
520-
fmt.Errorf("failed to extract layer contents from artifact: %w", err),
521-
sourcev1.OCILayerOperationFailedReason,
522-
)
523-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
524-
return sreconcile.ResultEmpty, e
525-
}
526-
case sourcev1.OCILayerCopy:
527-
metadata.Path = fmt.Sprintf("%s.tgz", metadata.Revision)
528-
file, err := os.Create(filepath.Join(dir, metadata.Path))
529-
if err != nil {
530-
e := serror.NewGeneric(
531-
fmt.Errorf("failed to create file to copy layer to: %w", err),
532-
sourcev1.OCILayerOperationFailedReason,
533-
)
534-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
535-
return sreconcile.ResultEmpty, e
536-
}
537-
defer file.Close()
500+
conditions.Delete(obj, sourcev1.FetchFailedCondition)
501+
return sreconcile.ResultSuccess, nil
502+
}
503+
504+
// getLayerCompressed finds the matching layer and returns its compress contents
505+
func (r *OCIRepositoryReconciler) getLayerCompressed(obj *sourcev1.OCIRepository, image gcrv1.Image) (io.ReadCloser, error) {
506+
layers, err := image.Layers()
507+
if err != nil {
508+
return nil, fmt.Errorf("failed to parse artifact layers: %w", err)
509+
}
538510

539-
_, err = io.Copy(file, blob)
511+
if len(layers) < 1 {
512+
return nil, fmt.Errorf("no layers found in artifact")
513+
}
514+
515+
var layer gcrv1.Layer
516+
switch {
517+
case obj.GetLayerMediaType() != "":
518+
var found bool
519+
for i, l := range layers {
520+
md, err := l.MediaType()
540521
if err != nil {
541-
e := serror.NewGeneric(
542-
fmt.Errorf("failed to copy layer from artifact: %w", err),
543-
sourcev1.OCILayerOperationFailedReason,
544-
)
545-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
546-
return sreconcile.ResultEmpty, e
522+
return nil, fmt.Errorf("failed to determine the media type of layer[%v] from artifact: %w", i, err)
547523
}
548-
default:
549-
e := serror.NewGeneric(
550-
fmt.Errorf("unsupported layer operation: %s", obj.GetLayerOperation()),
551-
sourcev1.OCILayerOperationFailedReason,
552-
)
553-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
554-
return sreconcile.ResultEmpty, e
524+
if string(md) == obj.GetLayerMediaType() {
525+
layer = layers[i]
526+
found = true
527+
break
528+
}
529+
}
530+
if !found {
531+
return nil, fmt.Errorf("failed to find layer with media type '%s' in artifact", obj.GetLayerMediaType())
555532
}
533+
default:
534+
layer = layers[0]
556535
}
557536

558-
conditions.Delete(obj, sourcev1.FetchFailedCondition)
559-
return sreconcile.ResultSuccess, nil
537+
blob, err := layer.Compressed()
538+
if err != nil {
539+
return nil, fmt.Errorf("failed to extract the first layer from artifact: %w", err)
540+
}
541+
542+
return blob, nil
543+
}
544+
545+
// getRevision fetches the upstream digest and returns the revision in the format `<tag>/<digest>`
546+
func (r *OCIRepositoryReconciler) getRevision(url string, options []crane.Option) (string, error) {
547+
ref, err := name.ParseReference(url)
548+
if err != nil {
549+
return "", err
550+
}
551+
552+
repoTag := ""
553+
repoName := strings.TrimPrefix(url, ref.Context().RegistryStr())
554+
if s := strings.Split(repoName, ":"); len(s) == 2 && !strings.Contains(repoName, "@") {
555+
repoTag = s[1]
556+
}
557+
558+
if repoTag == "" && !strings.Contains(repoName, "@") {
559+
repoTag = "latest"
560+
}
561+
562+
digest, err := crane.Digest(url, options...)
563+
if err != nil {
564+
return "", err
565+
}
566+
567+
digestHash, err := gcrv1.NewHash(digest)
568+
if err != nil {
569+
return "", err
570+
}
571+
572+
revision := digestHash.Hex
573+
if repoTag != "" {
574+
revision = fmt.Sprintf("%s/%s", repoTag, digestHash.Hex)
575+
}
576+
return revision, nil
577+
}
578+
579+
// digestFromRevision extract the digest from the revision string
580+
func (r *OCIRepositoryReconciler) digestFromRevision(revision string) string {
581+
parts := strings.Split(revision, "/")
582+
return parts[len(parts)-1]
560583
}
561584

562-
// verifyOCISourceSignature verifies the authenticity of the given image reference url. First, it tries using a key
585+
// verifySignature verifies the authenticity of the given image reference url. First, it tries using a key
563586
// if a secret with a valid public key is provided. If not, it falls back to a keyless approach for verification.
564-
func (r *OCIRepositoryReconciler) verifyOCISourceSignature(ctx context.Context, obj *sourcev1.OCIRepository, url string, keychain authn.Keychain) error {
587+
func (r *OCIRepositoryReconciler) verifySignature(ctx context.Context, obj *sourcev1.OCIRepository, url string, keychain authn.Keychain) error {
565588
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
566589
defer cancel()
567590

@@ -856,8 +879,7 @@ func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context, insecure boo
856879
// condition is added.
857880
// The hostname of any URL in the Status of the object are updated, to ensure
858881
// they match the Storage server hostname of current runtime.
859-
func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context,
860-
obj *sourcev1.OCIRepository, _ *sourcev1.Artifact, _ string) (sreconcile.Result, error) {
882+
func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.OCIRepository, _ *sourcev1.Artifact, _ string) (sreconcile.Result, error) {
861883
// Garbage collect previous advertised artifact(s) from storage
862884
_ = r.garbageCollect(ctx, obj)
863885

@@ -892,13 +914,12 @@ func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context,
892914
// early.
893915
// On a successful archive, the Artifact in the Status of the object is set,
894916
// and the symlink in the Storage is updated to its path.
895-
func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context,
896-
obj *sourcev1.OCIRepository, metadata *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
897-
// Calculate revision
917+
func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.OCIRepository, metadata *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
898918
revision := metadata.Revision
899919

900920
// Create artifact
901-
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
921+
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision,
922+
fmt.Sprintf("%s.tar.gz", r.digestFromRevision(revision)))
902923

903924
// Set the ArtifactInStorageCondition if there's no drift.
904925
defer func() {
@@ -1047,8 +1068,7 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
10471068
// This log is different from the debug log in the EventRecorder, in the sense
10481069
// that this is a simple log. While the debug log contains complete details
10491070
// about the event.
1050-
func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context,
1051-
obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
1071+
func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
10521072
msg := fmt.Sprintf(messageFmt, args...)
10531073
// Log and emit event.
10541074
if eventType == corev1.EventTypeWarning {
@@ -1060,8 +1080,7 @@ func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context,
10601080
}
10611081

10621082
// notify emits notification related to the reconciliation.
1063-
func (r *OCIRepositoryReconciler) notify(ctx context.Context,
1064-
oldObj, newObj *sourcev1.OCIRepository, res sreconcile.Result, resErr error) {
1083+
func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, res sreconcile.Result, resErr error) {
10651084
// Notify successful reconciliation for new artifact and recovery from any
10661085
// failure.
10671086
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {

0 commit comments

Comments
 (0)