Skip to content

Address Invalid Address in GRPC Catalogs #2499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour
// Set connection status and return.
out.Status.GRPCConnectionState.LastConnectTime = now
out.Status.GRPCConnectionState.LastObservedState = source.ConnectionState.String()
out.Status.GRPCConnectionState.Address = source.Address
}

return
Expand Down
89 changes: 80 additions & 9 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned/fake"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
Expand Down Expand Up @@ -759,6 +760,12 @@ func TestExecutePlanDynamicResources(t *testing.T) {
}
}

func withStatus(catalogSource v1alpha1.CatalogSource, status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
copy := catalogSource.DeepCopy()
copy.Status = status
return copy
}

func TestSyncCatalogSources(t *testing.T) {
clockFake := utilclock.NewFakeClock(time.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC))
now := metav1.NewTime(clockFake.Now())
Expand Down Expand Up @@ -787,14 +794,15 @@ func TestSyncCatalogSources(t *testing.T) {
},
}
tests := []struct {
testName string
namespace string
catalogSource *v1alpha1.CatalogSource
k8sObjs []runtime.Object
configMap *corev1.ConfigMap
expectedStatus *v1alpha1.CatalogSourceStatus
expectedObjs []runtime.Object
expectedError error
testName string
namespace string
catalogSource *v1alpha1.CatalogSource
k8sObjs []runtime.Object
configMap *corev1.ConfigMap
expectedStatus *v1alpha1.CatalogSourceStatus
expectedObjs []runtime.Object
expectedError error
existingSources []sourceAddress
}{
{
testName: "CatalogSourceWithInvalidSourceType",
Expand Down Expand Up @@ -1014,6 +1022,47 @@ func TestSyncCatalogSources(t *testing.T) {
},
expectedError: nil,
},
{
testName: "GRPCConnectionStateAddressIsUpdated",
namespace: "cool-namespace",
catalogSource: withStatus(*grpcCatalog, v1alpha1.CatalogSourceStatus{
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
Protocol: "grpc",
ServiceName: "cool-catalog",
ServiceNamespace: "cool-namespace",
Port: "50051",
CreatedAt: now,
},
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
Address: "..svc:", // Needs to be updated to cool-catalog.cool-namespace.svc:50051
},
}),
k8sObjs: []runtime.Object{
pod(*grpcCatalog),
service(grpcCatalog.GetName(), grpcCatalog.GetNamespace()),
},
existingSources: []sourceAddress{
{
sourceKey: registry.CatalogKey{Name: "cool-catalog", Namespace: "cool-namespace"},
address: "cool-catalog.cool-namespace.svc:50051",
},
},
expectedStatus: &v1alpha1.CatalogSourceStatus{
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
Protocol: "grpc",
ServiceName: "cool-catalog",
ServiceNamespace: "cool-namespace",
Port: "50051",
CreatedAt: now,
},
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
Address: "cool-catalog.cool-namespace.svc:50051",
LastObservedState: "",
LastConnectTime: now,
},
},
expectedError: nil,
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
Expand All @@ -1024,7 +1073,7 @@ func TestSyncCatalogSources(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...))
op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...), withSources(tt.existingSources...))
require.NoError(t, err)

// Run sync
Expand All @@ -1041,6 +1090,13 @@ func TestSyncCatalogSources(t *testing.T) {
require.NotEmpty(t, updated)

if tt.expectedStatus != nil {
if tt.expectedStatus.GRPCConnectionState != nil {
updated.Status.GRPCConnectionState.LastConnectTime = now
// Ignore LastObservedState difference if an expected LastObservedState is no provided
if tt.expectedStatus.GRPCConnectionState.LastObservedState == "" {
updated.Status.GRPCConnectionState.LastObservedState = ""
}
}
require.NotEmpty(t, updated.Status)
require.Equal(t, *tt.expectedStatus, updated.Status)

Expand Down Expand Up @@ -1385,6 +1441,7 @@ type fakeOperatorConfig struct {
resolver resolver.StepResolver
recorder record.EventRecorder
reconciler reconciler.RegistryReconcilerFactory
sources []sourceAddress
}

// fakeOperatorOption applies an option to the given fake operator configuration.
Expand All @@ -1396,6 +1453,12 @@ func withResolver(res resolver.StepResolver) fakeOperatorOption {
}
}

func withSources(sources ...sourceAddress) fakeOperatorOption {
return func(config *fakeOperatorConfig) {
config.sources = sources
}
}

func withReconciler(rec reconciler.RegistryReconcilerFactory) fakeOperatorOption {
return func(config *fakeOperatorConfig) {
config.reconciler = rec
Expand Down Expand Up @@ -1432,6 +1495,11 @@ func withFakeClientOptions(options ...clientfake.Option) fakeOperatorOption {
}
}

type sourceAddress struct {
address string
sourceKey registry.CatalogKey
}

// NewFakeOperator creates a new operator using fake clients.
func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, fakeOptions ...fakeOperatorOption) (*Operator, error) {
// Apply options to default config
Expand Down Expand Up @@ -1549,6 +1617,9 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,

op.RunInformers(ctx)
op.sources.Start(ctx)
for _, source := range config.sources {
op.sources.Add(source.sourceKey, source.address)
}

if ok := cache.WaitForCacheSync(ctx.Done(), op.HasSynced); !ok {
return nil, fmt.Errorf("failed to wait for caches to sync")
Expand Down
22 changes: 19 additions & 3 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
source := grpcCatalogSourceDecorator{catalogSource}

// if service status is nil, we force create every object to ensure they're created the first time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we update this comment?

Suggested change
// if service status is nil, we force create every object to ensure they're created the first time
// if service status is nil, or it is invalid, we force create every object to ensure they're created the first time

overwrite := source.Status.RegistryServiceStatus == nil
overwrite := source.Status.RegistryServiceStatus == nil || !isRegistryServiceStatusValid(&source)

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
sa, err := c.ensureSA(source)
Expand All @@ -217,17 +217,33 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca

if overwritePod {
now := c.now()
service := source.Service()
catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{
CreatedAt: now,
Protocol: "grpc",
ServiceName: source.Service().GetName(),
ServiceName: service.GetName(),
ServiceNamespace: source.GetNamespace(),
Port: fmt.Sprintf("%d", source.Service().Spec.Ports[0].Port),
Port: getPort(service),
}
}
return nil
}

func getPort(service *corev1.Service) string {
return fmt.Sprintf("%d", service.Spec.Ports[0].Port)
}

func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) bool {
service := source.Service()
if source.Status.RegistryServiceStatus.ServiceName != service.GetName() ||
source.Status.RegistryServiceStatus.ServiceNamespace != service.GetNamespace() ||
source.Status.RegistryServiceStatus.Port != getPort(service) ||
source.Status.RegistryServiceStatus.Protocol != "grpc" {
return false
}
return true
}

func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, saName string, overwrite bool) error {
// currentLivePods refers to the currently live instances of the catalog source
currentLivePods := c.currentPods(source)
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller/registry/reconciler/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func grpcCatalogSourceWithSecret(secretNames []string) *v1alpha1.CatalogSource {
},
}
}
func grpcCatalogSourceWithStatus(status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
catsrc := validGrpcCatalogSource("image", "")
catsrc.Status = status
return catsrc
}

func grpcCatalogSourceWithAnnotations(annotations map[string]string) *v1alpha1.CatalogSource {
catsrc := validGrpcCatalogSource("image", "")
Expand Down Expand Up @@ -284,6 +289,29 @@ func TestGrpcRegistryReconciler(t *testing.T) {
},
},
},
{
testName: "Grpc/ExistingRegistry/UpdateInvalidRegistryServiceStatus",
in: in{
cluster: cluster{
k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("image", "")),
},
catsrc: grpcCatalogSourceWithStatus(v1alpha1.CatalogSourceStatus{
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
CreatedAt: now(),
Protocol: "grpc",
},
}),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
CreatedAt: now(),
Protocol: "grpc",
ServiceName: "img-catalog",
ServiceNamespace: testNamespace,
Port: "50051",
},
},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
Expand Down