@@ -297,43 +297,57 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
297
297
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
298
298
defer cancel()
299
299
300
- // Generates registry credential keychain
300
+ // Generate the registry credential keychain
301
301
keychain, err := r.keychain(ctx, obj)
302
302
if err != nil {
303
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
304
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
303
+ e := serror.NewGeneric(
304
+ fmt.Errorf("failed to get credential: %w", err),
305
+ sourcev1.AuthenticationFailedReason,
306
+ )
307
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
305
308
return sreconcile.ResultEmpty, e
306
309
}
307
310
308
- // Generates transport for remote operations
311
+ // Generate the transport for remote operations
309
312
transport, err := r.transport(ctx, obj)
310
313
if err != nil {
311
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
312
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
314
+ e := serror.NewGeneric(
315
+ fmt.Errorf("failed to generate transport for '%s': %w", obj.Spec.URL, err),
316
+ sourcev1.OCIOperationFailedReason,
317
+ )
318
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
313
319
return sreconcile.ResultEmpty, e
314
320
}
315
321
316
322
// Determine which artifact revision to pull
317
323
url, err := r.getArtifactURL(ctxTimeout, obj, keychain, transport)
318
324
if err != nil {
319
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
320
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
325
+ e := serror.NewGeneric(
326
+ fmt.Errorf("failed to determine the artifact address for '%s': %w", obj.Spec.URL, err),
327
+ sourcev1.URLInvalidReason)
328
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
321
329
return sreconcile.ResultEmpty, e
322
330
}
323
331
324
332
// Pull artifact from the remote container registry
325
333
img, err := crane.Pull(url, r.craneOptions(ctxTimeout, keychain, transport)...)
326
334
if err != nil {
327
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
328
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
335
+ e := serror.NewGeneric(
336
+ fmt.Errorf("failed to pull artifact from '%s': %w", obj.Spec.URL, err),
337
+ sourcev1.OCIOperationFailedReason,
338
+ )
339
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
329
340
return sreconcile.ResultEmpty, e
330
341
}
331
342
332
343
// Determine the artifact SHA256 digest
333
344
imgDigest, err := img.Digest()
334
345
if err != nil {
335
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
336
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
346
+ e := serror.NewGeneric(
347
+ fmt.Errorf("failed to determine artifact digest: %w", err),
348
+ sourcev1.OCIOperationFailedReason,
349
+ )
350
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
337
351
return sreconcile.ResultEmpty, e
338
352
}
339
353
@@ -344,7 +358,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
344
358
// Mark observations about the revision on the object
345
359
defer func() {
346
360
if !obj.GetArtifact().HasRevision(revision) {
347
- message := fmt.Sprintf("new upstream revision '%s' for '%s'", revision, url)
361
+ message := fmt.Sprintf("new digest '%s' for '%s'", revision, url)
348
362
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
349
363
conditions.MarkReconciling(obj, "NewRevision", message)
350
364
}
@@ -354,28 +368,39 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
354
368
if !obj.GetArtifact().HasRevision(revision) {
355
369
layers, err := img.Layers()
356
370
if err != nil {
357
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
358
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
371
+ e := serror.NewGeneric(
372
+ fmt.Errorf("failed to parse artifact layers: %w", err),
373
+ sourcev1.OCIOperationFailedReason,
374
+ )
375
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
359
376
return sreconcile.ResultEmpty, e
360
377
}
361
378
362
379
if len(layers) < 1 {
363
- err = fmt.Errorf("no layers found in artifact")
364
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
365
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
380
+ e := serror.NewGeneric(
381
+ fmt.Errorf("no layers found in artifact"),
382
+ sourcev1.OCIOperationFailedReason,
383
+ )
384
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
366
385
return sreconcile.ResultEmpty, e
367
386
}
368
387
369
388
blob, err := layers[0].Compressed()
370
389
if err != nil {
371
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
372
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
390
+ e := serror.NewGeneric(
391
+ fmt.Errorf("failed to extract the first layer from artifact: %w", err),
392
+ sourcev1.OCIOperationFailedReason,
393
+ )
394
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
373
395
return sreconcile.ResultEmpty, e
374
396
}
375
397
376
398
if _, err = untar.Untar(blob, dir); err != nil {
377
- e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
378
- conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
399
+ e := serror.NewGeneric(
400
+ fmt.Errorf("failed to untar the first layer from artifact: %w", err),
401
+ sourcev1.OCIOperationFailedReason,
402
+ )
403
+ conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
379
404
return sreconcile.ResultEmpty, e
380
405
}
381
406
}
@@ -384,7 +409,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
384
409
return sreconcile.ResultSuccess, nil
385
410
}
386
411
387
- // parseRepositoryURL extracts the repository URL.
412
+ // parseRepositoryURL validates and extracts the repository URL.
388
413
func (r *OCIRepositoryReconciler) parseRepositoryURL(obj *sourcev1.OCIRepository) (string, error) {
389
414
if !strings.HasPrefix(obj.Spec.URL, sourcev1.OCIRepositoryPrefix) {
390
415
return "", fmt.Errorf("URL must be in format 'oci://<domain>/<org>/<repo>'")
@@ -393,14 +418,15 @@ func (r *OCIRepositoryReconciler) parseRepositoryURL(obj *sourcev1.OCIRepository
393
418
url := strings.TrimPrefix(obj.Spec.URL, sourcev1.OCIRepositoryPrefix)
394
419
ref, err := name.ParseReference(url)
395
420
if err != nil {
396
- return "", fmt.Errorf("'%s' invalid URL: %w", obj.Spec.URL, err)
421
+ return "", err
397
422
}
398
423
399
424
return ref.Context().Name(), nil
400
425
}
401
426
402
427
// getArtifactURL determines which tag or digest should be used and returns the OCI artifact FQN.
403
- func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context, obj *sourcev1.OCIRepository, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
428
+ func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context,
429
+ obj *sourcev1.OCIRepository, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
404
430
url, err := r.parseRepositoryURL(obj)
405
431
if err != nil {
406
432
return "", err
@@ -429,7 +455,8 @@ func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context, obj *sourc
429
455
430
456
// getTagBySemver call the remote container registry, fetches all the tags from the repository,
431
457
// and returns the latest tag according to the semver expression.
432
- func (r *OCIRepositoryReconciler) getTagBySemver(ctx context.Context, url, exp string, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
458
+ func (r *OCIRepositoryReconciler) getTagBySemver(ctx context.Context,
459
+ url, exp string, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
433
460
tags, err := crane.ListTags(url, r.craneOptions(ctx, keychain, transport)...)
434
461
if err != nil {
435
462
return "", err
@@ -495,7 +522,8 @@ func (r *OCIRepositoryReconciler) keychain(ctx context.Context, obj *sourcev1.OC
495
522
imagePullSecret := corev1.Secret{}
496
523
err := r.Get(ctx, types.NamespacedName{Namespace: obj.Namespace, Name: imagePullSecretName}, &imagePullSecret)
497
524
if err != nil {
498
- r.eventLogf(ctx, obj, events.EventSeverityTrace, "secret %q not found", imagePullSecretName)
525
+ r.eventLogf(ctx, obj, events.EventSeverityTrace, sourcev1.AuthenticationFailedReason,
526
+ "auth secret '%s' not found", imagePullSecretName)
499
527
return nil, err
500
528
}
501
529
imagePullSecrets[i] = imagePullSecret
@@ -504,51 +532,54 @@ func (r *OCIRepositoryReconciler) keychain(ctx context.Context, obj *sourcev1.OC
504
532
return k8schain.NewFromPullSecrets(ctx, imagePullSecrets)
505
533
}
506
534
507
- // transport clones the default transport from remote.
508
- // If certSecretRef is configured in the resource configuration,
509
- // returned transport will iclude client and/or CA certifactes
535
+ // transport clones the default transport from remote and when a certSecretRef is specified,
536
+ // the returned transport will include the TLS client and/or CA certificates.
510
537
func (r *OCIRepositoryReconciler) transport(ctx context.Context, obj *sourcev1.OCIRepository) (http.RoundTripper, error) {
511
- if obj.Spec.CertSecretRef != nil {
512
- var certSecret corev1.Secret
513
- err := r.Get(ctx,
514
- types.NamespacedName{Namespace: obj.Namespace, Name: obj.Spec.CertSecretRef.Name},
515
- &certSecret)
538
+ if obj.Spec.CertSecretRef == nil || obj.Spec.CertSecretRef.Name == "" {
539
+ return nil, nil
540
+ }
516
541
517
- if err != nil {
518
- r.eventLogf(ctx, obj, events.EventSeverityTrace, "secret %q not found", obj.Spec.CertSecretRef.Name)
519
- return nil, err
520
- }
542
+ certSecretName := types.NamespacedName{
543
+ Namespace: obj.Namespace,
544
+ Name: obj.Spec.CertSecretRef.Name,
545
+ }
546
+ var certSecret corev1.Secret
547
+ if err := r.Get(ctx, certSecretName, &certSecret); err != nil {
548
+ return nil, err
549
+ }
521
550
522
- transport := remote.DefaultTransport.Clone()
523
- tlsConfig := transport.TLSClientConfig
524
-
525
- if clientCert, ok := certSecret.Data[ClientCert]; ok {
526
- // parse and set client cert and secret
527
- if clientKey, ok := certSecret.Data[ClientKey]; ok {
528
- cert, err := tls.X509KeyPair(clientCert, clientKey)
529
- if err != nil {
530
- return nil, err
531
- }
532
- tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
533
- } else {
534
- return nil, fmt.Errorf("client certificate found, but no key")
535
- }
536
- }
537
- if caCert, ok := certSecret.Data[CACert]; ok {
538
- syscerts, err := x509.SystemCertPool()
551
+ transport := remote.DefaultTransport.Clone()
552
+ tlsConfig := transport.TLSClientConfig
553
+
554
+ if clientCert, ok := certSecret.Data[ClientCert]; ok {
555
+ // parse and set client cert and secret
556
+ if clientKey, ok := certSecret.Data[ClientKey]; ok {
557
+ cert, err := tls.X509KeyPair(clientCert, clientKey)
539
558
if err != nil {
540
559
return nil, err
541
560
}
542
- syscerts.AppendCertsFromPEM(caCert)
543
- tlsConfig.RootCAs = syscerts
561
+ tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
562
+ } else {
563
+ return nil, fmt.Errorf("'%s' found in secret, but no %s", ClientCert, ClientKey)
544
564
}
545
- return transport, nil
546
565
}
547
- return nil, nil
566
+
567
+ if caCert, ok := certSecret.Data[CACert]; ok {
568
+ syscerts, err := x509.SystemCertPool()
569
+ if err != nil {
570
+ return nil, err
571
+ }
572
+ syscerts.AppendCertsFromPEM(caCert)
573
+ tlsConfig.RootCAs = syscerts
574
+ }
575
+ return transport, nil
576
+
548
577
}
549
578
550
- // craneOptions sets the timeout and user agent for all operations against remote container registries.
551
- func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context, keychain authn.Keychain, transport http.RoundTripper) []crane.Option {
579
+ // craneOptions sets the auth headers, timeout and user agent
580
+ // for all operations against remote container registries.
581
+ func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context,
582
+ keychain authn.Keychain, transport http.RoundTripper) []crane.Option {
552
583
options := []crane.Option{
553
584
crane.WithContext(ctx),
554
585
crane.WithUserAgent("flux/v2"),
@@ -574,7 +605,8 @@ func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context, keychain aut
574
605
// condition is added.
575
606
// The hostname of any URL in the Status of the object are updated, to ensure
576
607
// they match the Storage server hostname of current runtime.
577
- func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.OCIRepository, _ *gcrv1.Hash, _ string) (sreconcile.Result, error) {
608
+ func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context,
609
+ obj *sourcev1.OCIRepository, _ *gcrv1.Hash, _ string) (sreconcile.Result, error) {
578
610
// Garbage collect previous advertised artifact(s) from storage
579
611
_ = r.garbageCollect(ctx, obj)
580
612
@@ -609,7 +641,8 @@ func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sou
609
641
// early.
610
642
// On a successful archive, the Artifact in the Status of the object is set,
611
643
// and the symlink in the Storage is updated to its path.
612
- func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) {
644
+ func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context,
645
+ obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) {
613
646
// Calculate revision
614
647
revision := digest.Hex
615
648
@@ -628,7 +661,7 @@ func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *so
628
661
// The artifact is up-to-date
629
662
if obj.GetArtifact().HasRevision(artifact.Revision) {
630
663
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.ArtifactUpToDateReason,
631
- "artifact up-to-date with remote revision : '%s'", artifact.Revision)
664
+ "artifact up-to-date with remote digest : '%s'", artifact.Revision)
632
665
return sreconcile.ResultSuccess, nil
633
666
}
634
667
@@ -751,7 +784,8 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
751
784
// This log is different from the debug log in the EventRecorder, in the sense
752
785
// that this is a simple log. While the debug log contains complete details
753
786
// about the event.
754
- func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
787
+ func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context,
788
+ obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
755
789
msg := fmt.Sprintf(messageFmt, args...)
756
790
// Log and emit event.
757
791
if eventType == corev1.EventTypeWarning {
@@ -763,7 +797,8 @@ func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Obj
763
797
}
764
798
765
799
// notify emits notification related to the reconciliation.
766
- func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) {
800
+ func (r *OCIRepositoryReconciler) notify(ctx context.Context,
801
+ oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) {
767
802
// Notify successful reconciliation for new artifact and recovery from any
768
803
// failure.
769
804
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
0 commit comments