Skip to content

OPECO-3066: Synchronize From Upstream Repositories #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions staging/api/pkg/operators/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,18 @@ const (
// SubscriptionBundleUnpackFailed indicates that the unpack job failed
SubscriptionBundleUnpackFailed SubscriptionConditionType = "BundleUnpackFailed"

// SubscriptionOperatorDeprecated indicates that the Operator currently installed with this Subscription has been deprecated.
SubscriptionOperatorDeprecated SubscriptionConditionType = "OperatorDeprecated"
// SubscriptionDeprecated is a roll-up condition which indicates that the Operator currently installed with this Subscription
//has been deprecated. It will be present when any of the three deprecation types (Package, Channel, Bundle) are present.
SubscriptionDeprecated SubscriptionConditionType = "Deprecated"

// SubscriptionOperatorDeprecated indicates that the Package currently installed with this Subscription has been deprecated.
SubscriptionPackageDeprecated SubscriptionConditionType = "PackageDeprecated"

// SubscriptionOperatorDeprecated indicates that the Channel used with this Subscription has been deprecated.
SubscriptionChannelDeprecated SubscriptionConditionType = "ChannelDeprecated"

// SubscriptionOperatorDeprecated indicates that the Bundle currently installed with this Subscription has been deprecated.
SubscriptionBundleDeprecated SubscriptionConditionType = "BundleDeprecated"
)

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func (o *Operator) now() metav1.Time {
func (o *Operator) syncSourceState(state grpc.SourceState) {
o.sourcesLastUpdate.Set(o.now().Time)

o.logger.Debugf("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)

switch state.State {
Expand Down Expand Up @@ -868,7 +868,11 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc

out = in.DeepCopy()

logger.Debug("checking catsrc configmap state")
logger = logger.WithFields(logrus.Fields{
"configmap.namespace": in.Namespace,
"configmap.name": in.Spec.ConfigMap,
})
logger.Info("checking catsrc configmap state")

var updateLabel bool
// Get the catalog source's config map
Expand Down Expand Up @@ -902,11 +906,11 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
return
}

logger.Debug("adopted configmap")
logger.Info("adopted configmap")
}

if in.Status.ConfigMapResource == nil || !in.Status.ConfigMapResource.IsAMatch(&configMap.ObjectMeta) {
logger.Debug("updating catsrc configmap state")
logger.Info("updating catsrc configmap state")
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Expand All @@ -926,6 +930,7 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) {
out = in.DeepCopy()

logger.Info("synchronizing registry server")
sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()}
srcReconciler := o.reconciler.ReconcilerForSource(in)
if srcReconciler == nil {
Expand All @@ -935,31 +940,32 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}

healthy, err := srcReconciler.CheckRegistryServer(in)
healthy, err := srcReconciler.CheckRegistryServer(logger, in)
if err != nil {
syncError = err
out.SetError(v1alpha1.CatalogSourceRegistryServerError, syncError)
return
}

logger.Debugf("check registry server healthy: %t", healthy)
logger.WithField("health", healthy).Infof("checked registry server health")

if healthy && in.Status.RegistryServiceStatus != nil {
logger.Debug("registry state good")
logger.Info("registry state good")
continueSync = true
// return here if catalog does not have polling enabled
if !out.Poll() {
logger.Info("polling not enabled, nothing more to do")
return
}
}

// Registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
logger.Debug("ensuring registry server")
logger.Info("ensuring registry server")

err = srcReconciler.EnsureRegistryServer(out)
err = srcReconciler.EnsureRegistryServer(logger, out)
if err != nil {
if _, ok := err.(reconciler.UpdateNotReadyErr); ok {
logger.Debug("requeueing registry server for catalog update check: update pod not yet ready")
logger.Info("requeueing registry server for catalog update check: update pod not yet ready")
o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), reconciler.CatalogPollingRequeuePeriod)
return
}
Expand All @@ -968,7 +974,7 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}

logger.Debug("ensured registry server")
logger.Info("ensured registry server")

// requeue the catalog sync based on the polling interval, for accurate syncs of catalogs with polling enabled
if out.Spec.UpdateStrategy != nil && out.Spec.UpdateStrategy.RegistryPoll != nil {
Expand All @@ -980,7 +986,7 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
if out.Spec.UpdateStrategy.RegistryPoll.ParsingError != "" && out.Status.Reason != v1alpha1.CatalogSourceIntervalInvalidError {
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, fmt.Errorf(out.Spec.UpdateStrategy.RegistryPoll.ParsingError))
}
logger.Debugf("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String())
logger.Infof("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String())
resyncPeriod := reconciler.SyncRegistryUpdateInterval(out, time.Now())
o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), queueinformer.ResyncWithJitter(resyncPeriod, 0.1)())
return
Expand Down Expand Up @@ -1068,16 +1074,17 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour
func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
catsrc, ok := obj.(*v1alpha1.CatalogSource)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
o.logger.Infof("wrong type: %#v", obj)
syncError = nil
return
}

logger := o.logger.WithFields(logrus.Fields{
"source": catsrc.GetName(),
"id": queueinformer.NewLoopID(),
"catalogsource.namespace": catsrc.Namespace,
"catalogsource.name": catsrc.Name,
"id": queueinformer.NewLoopID(),
})
logger.Debug("syncing catsrc")
logger.Info("syncing catalog source")

syncFunc := func(in *v1alpha1.CatalogSource, chain []CatalogSourceSyncFunc) (out *v1alpha1.CatalogSource, syncErr error) {
out = in
Expand Down Expand Up @@ -1133,7 +1140,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
func (o *Operator) syncResolvingNamespace(obj interface{}) error {
ns, ok := obj.(*corev1.Namespace)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
o.logger.Infof("wrong type: %#v", obj)
return fmt.Errorf("casting Namespace failed")
}
namespace := ns.GetName()
Expand All @@ -1146,9 +1153,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
o.gcInstallPlans(logger, namespace)

// get the set of sources that should be used for resolution and best-effort get their connections working
logger.Debug("resolving sources")
logger.Info("resolving sources")

logger.Debug("checking if subscriptions need update")
logger.Info("checking if subscriptions need update")

subs, err := o.listSubscriptions(namespace)
if err != nil {
Expand All @@ -1158,7 +1165,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {

// If there are no subscriptions, don't attempt to sync the namespace.
if len(subs) == 0 {
logger.Debug(fmt.Sprintf("No subscriptions were found in namespace %v", namespace))
logger.Info(fmt.Sprintf("No subscriptions were found in namespace %v", namespace))
return nil
}

Expand Down Expand Up @@ -1196,23 +1203,23 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
// ensure the installplan reference is correct
sub, changedIP, err := o.ensureSubscriptionInstallPlanState(logger, sub, failForwardEnabled)
if err != nil {
logger.Debugf("error ensuring installplan state: %v", err)
logger.Infof("error ensuring installplan state: %v", err)
return err
}
subscriptionUpdated = subscriptionUpdated || changedIP

// record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
sub, changedCSV, err := o.ensureSubscriptionCSVState(logger, sub, failForwardEnabled)
if err != nil {
logger.Debugf("error recording current state of CSV in status: %v", err)
logger.Infof("error recording current state of CSV in status: %v", err)
return err
}

subscriptionUpdated = subscriptionUpdated || changedCSV
subs[i] = sub
}
if subscriptionUpdated {
logger.Debug("subscriptions were updated, wait for a new resolution")
logger.Info("subscriptions were updated, wait for a new resolution")
return nil
}

Expand All @@ -1221,11 +1228,11 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
shouldUpdate = shouldUpdate || !o.nothingToUpdate(logger, sub)
}
if !shouldUpdate {
logger.Debug("all subscriptions up to date")
logger.Info("all subscriptions up to date")
return nil
}

logger.Debug("resolving subscriptions in namespace")
logger.Info("resolving subscriptions in namespace")

// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace)
Expand Down Expand Up @@ -1270,7 +1277,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
// Attempt to unpack bundles before installing
// Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
if len(bundleLookups) > 0 {
logger.Debug("unpacking bundles")
logger.Info("unpacking bundles")

var unpacked bool
unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout, minUnpackRetryInterval)
Expand Down Expand Up @@ -1335,15 +1342,15 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
return updateErr
}

logger.Debug("unpacking is not complete yet, requeueing")
logger.Info("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
return nil
}
}

// create installplan if anything updated
if len(updatedSubs) > 0 {
logger.Debug("resolution caused subscription changes, creating installplan")
logger.Info("resolution caused subscription changes, creating installplan")
// Finish calculating max generation by checking the existing installplans
installPlans, err := o.listInstallPlans(namespace)
if err != nil {
Expand Down Expand Up @@ -1372,7 +1379,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
}
updatedSubs = o.setIPReference(updatedSubs, maxGeneration+1, installPlanReference)
} else {
logger.Debugf("no subscriptions were updated")
logger.Infof("no subscriptions were updated")
}

// Make sure that we no longer indicate unpacking progress
Expand Down Expand Up @@ -1416,7 +1423,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
func (o *Operator) syncSubscriptions(obj interface{}) error {
sub, ok := obj.(*v1alpha1.Subscription)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
o.logger.Infof("wrong type: %#v", obj)
return fmt.Errorf("casting Subscription failed")
}

Expand All @@ -1430,7 +1437,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
func (o *Operator) syncOperatorGroups(obj interface{}) error {
og, ok := obj.(*operatorsv1.OperatorGroup)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
o.logger.Infof("wrong type: %#v", obj)
return fmt.Errorf("casting OperatorGroup failed")
}

Expand All @@ -1441,7 +1448,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {

func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool {
if sub.Status.InstallPlanRef != nil && sub.Status.State == v1alpha1.SubscriptionStateUpgradePending {
logger.Debugf("skipping update: installplan already created")
logger.Infof("skipping update: installplan already created")
return true
}
return false
Expand All @@ -1452,7 +1459,7 @@ func (o *Operator) ensureSubscriptionInstallPlanState(logger *logrus.Entry, sub
return sub, false, nil
}

logger.Debug("checking for existing installplan")
logger.Info("checking for existing installplan")

// check if there's an installplan that created this subscription (only if it doesn't have a reference yet)
// this indicates it was newly resolved by another operator, and we should reference that installplan in the status
Expand Down Expand Up @@ -1808,7 +1815,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
}

if err := utilerrors.NewAggregate(errs); err != nil {
o.logger.Debugf("failed to unpack bundles: %v", err)
o.logger.Infof("failed to unpack bundles: %v", err)
return false, nil, nil, err
}

Expand Down Expand Up @@ -1889,7 +1896,7 @@ func (o *Operator) gcInstallPlans(log logrus.FieldLogger, namespace string) {
func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
plan, ok := obj.(*v1alpha1.InstallPlan)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
o.logger.Infof("wrong type: %#v", obj)
return fmt.Errorf("casting InstallPlan failed")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ func TestSyncResolvingNamespace(t *testing.T) {
o.reconciler = &fakes.FakeRegistryReconcilerFactory{
ReconcilerForSourceStub: func(source *v1alpha1.CatalogSource) reconciler.RegistryReconciler {
return &fakes.FakeRegistryReconciler{
EnsureRegistryServerStub: func(source *v1alpha1.CatalogSource) error {
EnsureRegistryServerStub: func(logger *logrus.Entry, source *v1alpha1.CatalogSource) error {
return nil
},
}
Expand Down Expand Up @@ -1748,7 +1748,7 @@ func TestSyncRegistryServer(t *testing.T) {
op.reconciler = &fakes.FakeRegistryReconcilerFactory{
ReconcilerForSourceStub: func(source *v1alpha1.CatalogSource) reconciler.RegistryReconciler {
return &fakes.FakeRegistryReconciler{
EnsureRegistryServerStub: func(source *v1alpha1.CatalogSource) error {
EnsureRegistryServerStub: func(logger *logrus.Entry, source *v1alpha1.CatalogSource) error {
return nil
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"

"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -193,7 +194,7 @@ func (c *catalogHealthReconciler) healthy(catalog *v1alpha1.CatalogSource) (bool
return false, fmt.Errorf("could not get reconciler for catalog: %#v", catalog)
}

return rec.CheckRegistryServer(catalog)
return rec.CheckRegistryServer(logrus.NewEntry(logrus.New()), catalog)
}

// installPlanReconciler reconciles InstallPlan status for Subscriptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -1676,7 +1677,7 @@ func fakeRegistryReconcilerFactory(healthy bool, err error) *olmfakes.FakeRegist
return &olmfakes.FakeRegistryReconcilerFactory{
ReconcilerForSourceStub: func(*v1alpha1.CatalogSource) registryreconciler.RegistryReconciler {
return &olmfakes.FakeRegistryReconciler{
CheckRegistryServerStub: func(*v1alpha1.CatalogSource) (bool, error) {
CheckRegistryServerStub: func(*logrus.Entry, *v1alpha1.CatalogSource) (bool, error) {
return healthy, err
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -1182,7 +1183,7 @@ func TestSyncSubscriptions(t *testing.T) {
o.reconciler = &fakes.FakeRegistryReconcilerFactory{
ReconcilerForSourceStub: func(source *v1alpha1.CatalogSource) reconciler.RegistryReconciler {
return &fakes.FakeRegistryReconciler{
EnsureRegistryServerStub: func(source *v1alpha1.CatalogSource) error {
EnsureRegistryServerStub: func(*logrus.Entry, *v1alpha1.CatalogSource) error {
return nil
},
}
Expand Down Expand Up @@ -1320,7 +1321,7 @@ func BenchmarkSyncResolvingNamespace(b *testing.B) {
reconciler: &fakes.FakeRegistryReconcilerFactory{
ReconcilerForSourceStub: func(*v1alpha1.CatalogSource) reconciler.RegistryReconciler {
return &fakes.FakeRegistryReconciler{
CheckRegistryServerStub: func(*v1alpha1.CatalogSource) (bool, error) {
CheckRegistryServerStub: func(*logrus.Entry, *v1alpha1.CatalogSource) (bool, error) {
return true, nil
},
}
Expand Down
Loading