Skip to content

Commit 7b5d764

Browse files
committed
Address Invalid Address in GRPC Catalogs
Problem: Within the catalogSource resource, the RegistryServiceStatus stores service information that is used to generate an address that OLM relies on in order to establish a connection with the associated pod. If the RegistryStatusService is not nil and is missing the namespace, name, and port information for its service, OLM is unable to recover until the catalogService's associated pod has an invalid image or spec. Solution: When reconciling a CatalogSource, OLM will now ensure that the RegistryServiceStatus of the catalogSource is valid and will update the catalogSource's status to reflect the change. Additionally, this address is stored within the status of the catalogSource within the status.GRPCConnectionState.Address field. If the address changes, OLM will update this field to reflect the new address as well. Signed-off-by: Alexander Greene <[email protected]>
1 parent eabd986 commit 7b5d764

File tree

4 files changed

+128
-12
lines changed

4 files changed

+128
-12
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,7 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour
803803
// Set connection status and return.
804804
out.Status.GRPCConnectionState.LastConnectTime = now
805805
out.Status.GRPCConnectionState.LastObservedState = source.ConnectionState.String()
806+
out.Status.GRPCConnectionState.Address = source.Address
806807
}
807808

808809
return

pkg/controller/operators/catalog/operator_test.go

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned/fake"
5151
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
5252
olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors"
53+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
5354
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
5455
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
5556
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
@@ -759,6 +760,12 @@ func TestExecutePlanDynamicResources(t *testing.T) {
759760
}
760761
}
761762

763+
func withStatus(catalogSource v1alpha1.CatalogSource, status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
764+
copy := catalogSource.DeepCopy()
765+
copy.Status = status
766+
return copy
767+
}
768+
762769
func TestSyncCatalogSources(t *testing.T) {
763770
clockFake := utilclock.NewFakeClock(time.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC))
764771
now := metav1.NewTime(clockFake.Now())
@@ -787,14 +794,15 @@ func TestSyncCatalogSources(t *testing.T) {
787794
},
788795
}
789796
tests := []struct {
790-
testName string
791-
namespace string
792-
catalogSource *v1alpha1.CatalogSource
793-
k8sObjs []runtime.Object
794-
configMap *corev1.ConfigMap
795-
expectedStatus *v1alpha1.CatalogSourceStatus
796-
expectedObjs []runtime.Object
797-
expectedError error
797+
testName string
798+
namespace string
799+
catalogSource *v1alpha1.CatalogSource
800+
k8sObjs []runtime.Object
801+
configMap *corev1.ConfigMap
802+
expectedStatus *v1alpha1.CatalogSourceStatus
803+
expectedObjs []runtime.Object
804+
expectedError error
805+
existingSources []sourceAddress
798806
}{
799807
{
800808
testName: "CatalogSourceWithInvalidSourceType",
@@ -1014,6 +1022,47 @@ func TestSyncCatalogSources(t *testing.T) {
10141022
},
10151023
expectedError: nil,
10161024
},
1025+
{
1026+
testName: "GRPCConnectionStateAddressIsUpdated",
1027+
namespace: "cool-namespace",
1028+
catalogSource: withStatus(*grpcCatalog, v1alpha1.CatalogSourceStatus{
1029+
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
1030+
Protocol: "grpc",
1031+
ServiceName: "cool-catalog",
1032+
ServiceNamespace: "cool-namespace",
1033+
Port: "50051",
1034+
CreatedAt: now,
1035+
},
1036+
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
1037+
Address: "..svc:", // Needs to be updated to cool-catalog.cool-namespace.svc:50051
1038+
},
1039+
}),
1040+
k8sObjs: []runtime.Object{
1041+
pod(*grpcCatalog),
1042+
service(grpcCatalog.GetName(), grpcCatalog.GetNamespace()),
1043+
},
1044+
existingSources: []sourceAddress{
1045+
{
1046+
sourceKey: registry.CatalogKey{Name: "cool-catalog", Namespace: "cool-namespace"},
1047+
address: "cool-catalog.cool-namespace.svc:50051",
1048+
},
1049+
},
1050+
expectedStatus: &v1alpha1.CatalogSourceStatus{
1051+
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
1052+
Protocol: "grpc",
1053+
ServiceName: "cool-catalog",
1054+
ServiceNamespace: "cool-namespace",
1055+
Port: "50051",
1056+
CreatedAt: now,
1057+
},
1058+
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
1059+
Address: "cool-catalog.cool-namespace.svc:50051",
1060+
LastObservedState: "",
1061+
LastConnectTime: now,
1062+
},
1063+
},
1064+
expectedError: nil,
1065+
},
10171066
}
10181067
for _, tt := range tests {
10191068
t.Run(tt.testName, func(t *testing.T) {
@@ -1024,7 +1073,7 @@ func TestSyncCatalogSources(t *testing.T) {
10241073
ctx, cancel := context.WithCancel(context.TODO())
10251074
defer cancel()
10261075

1027-
op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...))
1076+
op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...), withSources(tt.existingSources...))
10281077
require.NoError(t, err)
10291078

10301079
// Run sync
@@ -1041,6 +1090,13 @@ func TestSyncCatalogSources(t *testing.T) {
10411090
require.NotEmpty(t, updated)
10421091

10431092
if tt.expectedStatus != nil {
1093+
if tt.expectedStatus.GRPCConnectionState != nil {
1094+
updated.Status.GRPCConnectionState.LastConnectTime = now
1095+
// Ignore LastObservedState difference if an expected LastObservedState is no provided
1096+
if tt.expectedStatus.GRPCConnectionState.LastObservedState == "" {
1097+
updated.Status.GRPCConnectionState.LastObservedState = ""
1098+
}
1099+
}
10441100
require.NotEmpty(t, updated.Status)
10451101
require.Equal(t, *tt.expectedStatus, updated.Status)
10461102

@@ -1385,6 +1441,7 @@ type fakeOperatorConfig struct {
13851441
resolver resolver.StepResolver
13861442
recorder record.EventRecorder
13871443
reconciler reconciler.RegistryReconcilerFactory
1444+
sources []sourceAddress
13881445
}
13891446

13901447
// fakeOperatorOption applies an option to the given fake operator configuration.
@@ -1396,6 +1453,12 @@ func withResolver(res resolver.StepResolver) fakeOperatorOption {
13961453
}
13971454
}
13981455

1456+
func withSources(sources ...sourceAddress) fakeOperatorOption {
1457+
return func(config *fakeOperatorConfig) {
1458+
config.sources = sources
1459+
}
1460+
}
1461+
13991462
func withReconciler(rec reconciler.RegistryReconcilerFactory) fakeOperatorOption {
14001463
return func(config *fakeOperatorConfig) {
14011464
config.reconciler = rec
@@ -1432,6 +1495,11 @@ func withFakeClientOptions(options ...clientfake.Option) fakeOperatorOption {
14321495
}
14331496
}
14341497

1498+
type sourceAddress struct {
1499+
address string
1500+
sourceKey registry.CatalogKey
1501+
}
1502+
14351503
// NewFakeOperator creates a new operator using fake clients.
14361504
func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, fakeOptions ...fakeOperatorOption) (*Operator, error) {
14371505
// Apply options to default config
@@ -1549,6 +1617,9 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
15491617

15501618
op.RunInformers(ctx)
15511619
op.sources.Start(ctx)
1620+
for _, source := range config.sources {
1621+
op.sources.Add(source.sourceKey, source.address)
1622+
}
15521623

15531624
if ok := cache.WaitForCacheSync(ctx.Done(), op.HasSynced); !ok {
15541625
return nil, fmt.Errorf("failed to wait for caches to sync")

pkg/controller/registry/reconciler/grpc.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
192192
source := grpcCatalogSourceDecorator{catalogSource}
193193

194194
// if service status is nil, we force create every object to ensure they're created the first time
195-
overwrite := source.Status.RegistryServiceStatus == nil
195+
overwrite := source.Status.RegistryServiceStatus == nil || !isRegistryServiceStatusValid(&source)
196196

197197
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
198198
sa, err := c.ensureSA(source)
@@ -217,17 +217,33 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
217217

218218
if overwritePod {
219219
now := c.now()
220+
service := source.Service()
220221
catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{
221222
CreatedAt: now,
222223
Protocol: "grpc",
223-
ServiceName: source.Service().GetName(),
224+
ServiceName: service.GetName(),
224225
ServiceNamespace: source.GetNamespace(),
225-
Port: fmt.Sprintf("%d", source.Service().Spec.Ports[0].Port),
226+
Port: getPort(service),
226227
}
227228
}
228229
return nil
229230
}
230231

232+
func getPort(service *corev1.Service) string {
233+
return fmt.Sprintf("%d", service.Spec.Ports[0].Port)
234+
}
235+
236+
func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) bool {
237+
service := source.Service()
238+
if source.Status.RegistryServiceStatus.ServiceName != service.GetName() ||
239+
source.Status.RegistryServiceStatus.ServiceNamespace != service.GetNamespace() ||
240+
source.Status.RegistryServiceStatus.Port != getPort(service) ||
241+
source.Status.RegistryServiceStatus.Protocol != "grpc" {
242+
return false
243+
}
244+
return true
245+
}
246+
231247
func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, saName string, overwrite bool) error {
232248
// currentLivePods refers to the currently live instances of the catalog source
233249
currentLivePods := c.currentPods(source)

pkg/controller/registry/reconciler/grpc_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ func grpcCatalogSourceWithSecret(secretNames []string) *v1alpha1.CatalogSource {
4949
},
5050
}
5151
}
52+
func grpcCatalogSourceWithStatus(status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
53+
catsrc := validGrpcCatalogSource("image", "")
54+
catsrc.Status = status
55+
return catsrc
56+
}
5257

5358
func grpcCatalogSourceWithAnnotations(annotations map[string]string) *v1alpha1.CatalogSource {
5459
catsrc := validGrpcCatalogSource("image", "")
@@ -284,6 +289,29 @@ func TestGrpcRegistryReconciler(t *testing.T) {
284289
},
285290
},
286291
},
292+
{
293+
testName: "Grpc/ExistingRegistry/UpdateInvalidRegistryServiceStatus",
294+
in: in{
295+
cluster: cluster{
296+
k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("image", "")),
297+
},
298+
catsrc: grpcCatalogSourceWithStatus(v1alpha1.CatalogSourceStatus{
299+
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
300+
CreatedAt: now(),
301+
Protocol: "grpc",
302+
},
303+
}),
304+
},
305+
out: out{
306+
status: &v1alpha1.RegistryServiceStatus{
307+
CreatedAt: now(),
308+
Protocol: "grpc",
309+
ServiceName: "img-catalog",
310+
ServiceNamespace: testNamespace,
311+
Port: "50051",
312+
},
313+
},
314+
},
287315
}
288316
for _, tt := range tests {
289317
t.Run(tt.testName, func(t *testing.T) {

0 commit comments

Comments
 (0)