Skip to content

Give Sources control of invalidation for Snapshots they produce. #2651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type Operator struct {
bundleUnpackTimeout time.Duration
clientFactory clients.Factory
muInstallPlan sync.Mutex
resolverSourceProvider *resolver.RegistrySourceProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -188,8 +189,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(config),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.sources, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -465,7 +467,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
o.resolver.Expire(resolvercache.SourceKey(state.Key))
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand Down
55 changes: 29 additions & 26 deletions pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (p StaticSourceProvider) Sources(namespaces ...string) map[SourceKey]Source

type OperatorCacheProvider interface {
Namespaced(namespaces ...string) MultiCatalogOperatorFinder
Expire(catalog SourceKey)
}

type SourcePriorityProvider interface {
Expand Down Expand Up @@ -150,22 +149,11 @@ func (c *NamespacedOperatorCache) Error() error {
return errors.NewAggregate(errs)
}

func (c *Cache) Expire(catalog SourceKey) {
c.m.Lock()
defer c.m.Unlock()
s, ok := c.snapshots[catalog]
if !ok {
return
}
s.expiry = time.Unix(0, 0)
}

func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
const (
CachePopulateTimeout = time.Minute
)

now := time.Now()
sources := c.sp.Sources(namespaces...)

result := NamespacedOperatorCache{
Expand All @@ -182,7 +170,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
func() {
snapshot.m.RLock()
defer snapshot.m.RUnlock()
if snapshot.Valid(now) {
if snapshot.Valid() {
result.snapshots[key] = snapshot
} else {
misses = append(misses, key)
Expand All @@ -205,7 +193,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
// Take the opportunity to clear expired snapshots while holding the lock.
var expired []SourceKey
for key, snapshot := range c.snapshots {
if !snapshot.Valid(now) {
if !snapshot.Valid() {
snapshot.Cancel()
expired = append(expired, key)
}
Expand All @@ -217,7 +205,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
// Check for any snapshots that were populated while waiting to acquire the lock.
var found int
for i := range misses {
if hdr, ok := c.snapshots[misses[i]]; ok && hdr.Valid(now) {
if hdr, ok := c.snapshots[misses[i]]; ok && hdr.Valid() {
result.snapshots[misses[i]] = hdr
misses[found], misses[i] = misses[i], misses[found]
found++
Expand All @@ -230,17 +218,10 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {

hdr := snapshotHeader{
key: miss,
expiry: now.Add(c.ttl),
pop: cancel,
priority: c.sourcePriorityProvider.Priority(miss),
}

if miss.Virtual() {
// hack! always refresh virtual catalogs.
// todo: Sources should be responsible for determining when the Snapshots they produce become invalid
hdr.expiry = time.Time{}
}

hdr.m.Lock()
c.snapshots[miss] = &hdr
result.snapshots[miss] = &hdr
Expand All @@ -249,7 +230,13 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
defer hdr.m.Unlock()
c.sem <- struct{}{}
defer func() { <-c.sem }()
hdr.snapshot, hdr.err = source.Snapshot(ctx)
if snapshot, err := source.Snapshot(ctx); err != nil {
hdr.err = err
} else if snapshot != nil {
hdr.snapshot = snapshot
} else {
hdr.err = fmt.Errorf("source %q produced no snapshot and no error", hdr.key)
}
}(ctx, &hdr, sources[miss])
}

Expand Down Expand Up @@ -286,6 +273,15 @@ func (c *NamespacedOperatorCache) Find(p ...Predicate) []*Entry {

type Snapshot struct {
Entries []*Entry

// Unless closed, the Snapshot is valid.
Valid <-chan struct{}
}

func ValidOnce() <-chan struct{} {
c := make(chan struct{})
close(c)
return c
}

var _ Source = &Snapshot{}
Expand All @@ -298,7 +294,6 @@ type snapshotHeader struct {
snapshot *Snapshot

key SourceKey
expiry time.Time
m sync.RWMutex
pop context.CancelFunc
err error
Expand All @@ -309,10 +304,18 @@ func (hdr *snapshotHeader) Cancel() {
hdr.pop()
}

func (hdr *snapshotHeader) Valid(at time.Time) bool {
func (hdr *snapshotHeader) Valid() bool {
hdr.m.RLock()
defer hdr.m.RUnlock()
return hdr.snapshot != nil && hdr.err == nil && at.Before(hdr.expiry)
if hdr.snapshot == nil || hdr.err != nil {
return false
}
select {
case <-hdr.snapshot.Valid:
return false
default:
}
return true
}

type sortableSnapshots struct {
Expand Down
36 changes: 12 additions & 24 deletions pkg/controller/registry/resolver/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/rand"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -68,12 +67,12 @@ func TestOperatorCacheExpiration(t *testing.T) {
key := SourceKey{Namespace: "dummynamespace", Name: "dummyname"}
ssp := make(StaticSourceProvider)
c := New(ssp)
c.ttl = 0 // instantly stale

ssp[key] = &Snapshot{
Entries: []*Entry{
{Name: "v1"},
},
Valid: ValidOnce(),
}
require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(CSVNamePredicate("v1")), 1)

Expand Down Expand Up @@ -108,62 +107,51 @@ func TestOperatorCacheReuse(t *testing.T) {
func TestCatalogSnapshotValid(t *testing.T) {
type tc struct {
Name string
Expiry time.Time
Snapshot *Snapshot
Error error
At time.Time
Expected bool
}

for _, tt := range []tc{
{
Name: "after expiry",
Expiry: time.Unix(0, 1),
Snapshot: &Snapshot{},
Name: "invalidated",
Snapshot: &Snapshot{
Valid: ValidOnce(),
},
Error: nil,
At: time.Unix(0, 2),
Expected: false,
},
{
Name: "before expiry",
Expiry: time.Unix(0, 2),
Snapshot: &Snapshot{},
Name: "valid",
Snapshot: &Snapshot{}, // valid forever
Error: nil,
At: time.Unix(0, 1),
Expected: true,
},
{
Name: "nil snapshot",
Expiry: time.Unix(0, 2),
Name: "nil snapshot and non-nil error",
Snapshot: nil,
Error: errors.New(""),
At: time.Unix(0, 1),
Expected: false,
},
{
Name: "non-nil error",
Expiry: time.Unix(0, 2),
Name: "non-nil snapshot and non-nil error",
Snapshot: &Snapshot{},
Error: errors.New(""),
At: time.Unix(0, 1),
Expected: false,
},
{
Name: "at expiry",
Expiry: time.Unix(0, 1),
Snapshot: &Snapshot{},
Name: "nil snapshot and nil error",
Snapshot: nil,
Error: nil,
At: time.Unix(0, 1),
Expected: false,
},
} {
t.Run(tt.Name, func(t *testing.T) {
s := snapshotHeader{
expiry: tt.Expiry,
snapshot: tt.Snapshot,
err: tt.Error,
}
assert.Equal(t, tt.Expected, s.Valid(tt.At))
assert.Equal(t, tt.Expected, s.Valid())
})
}
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/controller/registry/resolver/instrumented_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
)

type InstrumentedResolver struct {
Expand Down Expand Up @@ -33,7 +32,3 @@ func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step
}
return steps, lookups, subs, err
}

func (ir *InstrumentedResolver) Expire(key cache.SourceKey) {
ir.resolver.Expire(key)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/stretchr/testify/require"
)

Expand All @@ -22,16 +21,10 @@ func (r *fakeResolverWithError) ResolveSteps(namespace string) ([]*v1alpha1.Step
return nil, nil, nil, errors.New("Fake error")
}

func (r *fakeResolverWithError) Expire(key cache.SourceKey) {
}

func (r *fakeResolverWithoutError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
return nil, nil, nil, nil
}

func (r *fakeResolverWithoutError) Expire(key cache.SourceKey) {
}

func newFakeResolverWithError() *fakeResolverWithError {
return &fakeResolverWithError{}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/registry/resolver/source_csvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
s.logger.Printf("considered csvs without properties annotation during resolution: %v", names)
}

return &cache.Snapshot{Entries: entries}, nil
return &cache.Snapshot{
Entries: entries,
Valid: cache.ValidOnce(),
}, nil
}

func (s *csvSource) inferProperties(csv *v1alpha1.ClusterServiceVersion, subs []*v1alpha1.Subscription) ([]*api.Property, error) {
Expand Down
Loading