Skip to content

Commit 4437e8b

Browse files
committed
primitive stampede support
1 parent f8ab4f4 commit 4437e8b

File tree

4 files changed

+64
-20
lines changed

4 files changed

+64
-20
lines changed

staging/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"slices"
78
"sort"
89
"sync"
910
"time"
@@ -147,7 +148,7 @@ func (c *NamespacedOperatorCache) Error() error {
147148

148149
func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
149150
const (
150-
CachePopulateTimeout = time.Minute
151+
cachePopulateTimeout = time.Minute
151152
)
152153

153154
sources := c.sp.Sources(namespaces...)
@@ -209,13 +210,27 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
209210
}
210211
misses = misses[found:]
211212

213+
// remove any with a "live" outstanding request
214+
misses = slices.DeleteFunc(misses, func(key SourceKey) bool {
215+
hdr, _ := c.snapshots[key]
216+
217+
// if we already have a request timestamp, we have an outstanding request, so prevent stacking
218+
// and just send new requests if the previous one has expired
219+
if hdr.RequestSentinelActive() {
220+
c.logger.Printf("Skipping new request for %s, already in progress", key)
221+
return true
222+
}
223+
return false
224+
})
225+
212226
for _, miss := range misses {
213-
ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout)
227+
ctx, cancel := context.WithTimeout(context.Background(), cachePopulateTimeout)
214228

215229
hdr := snapshotHeader{
216-
key: miss,
217-
pop: cancel,
218-
priority: c.sourcePriorityProvider.Priority(miss),
230+
key: miss,
231+
pop: cancel,
232+
priority: c.sourcePriorityProvider.Priority(miss),
233+
requestSentinel: time.Now().Add(cachePopulateTimeout), // set sentinel to prevent stacking requests
219234
}
220235

221236
hdr.m.Lock()
@@ -228,6 +243,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
228243
defer func() { <-c.sem }()
229244
if snapshot, err := source.Snapshot(ctx); err != nil {
230245
hdr.err = err
246+
// don't adjust request timestamp, so that we don't stampede sources
231247
} else if snapshot != nil {
232248
hdr.snapshot = snapshot
233249
} else {
@@ -294,6 +310,8 @@ type snapshotHeader struct {
294310
pop context.CancelFunc
295311
err error
296312
priority int
313+
314+
requestSentinel time.Time
297315
}
298316

299317
func (hdr *snapshotHeader) Cancel() {
@@ -314,6 +332,12 @@ func (hdr *snapshotHeader) Valid() bool {
314332
return true
315333
}
316334

335+
func (hdr *snapshotHeader) RequestSentinelActive() bool {
336+
hdr.m.RLock()
337+
defer hdr.m.RUnlock()
338+
return time.Now().Before(hdr.requestSentinel)
339+
}
340+
317341
type sortableSnapshots struct {
318342
snapshots []*snapshotHeader
319343
preferredNamespace string

staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"runtime/debug"
87
"sync"
98
"time"
109

@@ -76,14 +75,16 @@ type RegistrySourceProvider struct {
7675
invalidator *sourceInvalidator
7776
}
7877

78+
const defaultCacheLifetime time.Duration = 30 * time.Minute
79+
7980
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
8081
return &RegistrySourceProvider{
8182
rcp: rcp,
8283
logger: logger,
8384
catsrcLister: catsrcLister,
8485
invalidator: &sourceInvalidator{
8586
validChans: make(map[cache.SourceKey]chan struct{}),
86-
ttl: 30 * time.Minute,
87+
ttl: defaultCacheLifetime,
8788
},
8889
}
8990
}
@@ -153,9 +154,6 @@ func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error)
153154
// or embed the information into Bundle.
154155
packages := make(map[string]*api.Package)
155156

156-
s.logger.Printf("JEK >>>>> Listing packages for %#v", s.key)
157-
debug.PrintStack()
158-
159157
it, err := s.client.ListBundles(ctx)
160158
if err != nil {
161159
return nil, fmt.Errorf("failed to list bundles: %w", err)

vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go

Lines changed: 29 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go

Lines changed: 3 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)