@@ -4,12 +4,14 @@ package resolver
4
4
import (
5
5
"context"
6
6
"fmt"
7
+ "io"
7
8
8
9
"github.com/blang/semver"
9
10
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10
11
"k8s.io/apimachinery/pkg/util/errors"
11
12
12
13
"github.com/operator-framework/operator-registry/pkg/api"
14
+ registryapi "github.com/operator-framework/operator-registry/pkg/api"
13
15
"github.com/operator-framework/operator-registry/pkg/client"
14
16
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
15
17
)
@@ -24,7 +26,7 @@ type SourceRef struct {
24
26
}
25
27
26
28
type SourceQuerier interface {
27
- FindProvider (api opregistry.APIKey , initialSource CatalogKey ) (* api.Bundle , * CatalogKey , error )
29
+ FindProvider (api opregistry.APIKey , initialSource CatalogKey , pkgName string ) (* api.Bundle , * CatalogKey , error )
28
30
FindBundle (pkgName , channelName , bundleName string , initialSource CatalogKey ) (* api.Bundle , * CatalogKey , error )
29
31
FindLatestBundle (pkgName , channelName string , initialSource CatalogKey ) (* api.Bundle , * CatalogKey , error )
30
32
FindReplacement (currentVersion * semver.Version , bundleName , pkgName , channelName string , initialSource CatalogKey ) (* api.Bundle , * CatalogKey , error )
@@ -33,13 +35,46 @@ type SourceQuerier interface {
33
35
34
36
type NamespaceSourceQuerier struct {
35
37
sources map [CatalogKey ]client.Interface
38
+ clients map [CatalogKey ]* client.Client
36
39
}
37
40
38
41
var _ SourceQuerier = & NamespaceSourceQuerier {}
39
42
40
- func NewNamespaceSourceQuerier (sources map [CatalogKey ]client.Interface ) * NamespaceSourceQuerier {
43
+ type ChannelEntryStream interface {
44
+ Recv () (* api.ChannelEntry , error )
45
+ }
46
+
47
+ type ChannelEntryIterator struct {
48
+ stream ChannelEntryStream
49
+ error error
50
+ }
51
+
52
+ func NewChannelEntryIterator (stream ChannelEntryStream ) * ChannelEntryIterator {
53
+ return & ChannelEntryIterator {stream : stream }
54
+ }
55
+
56
+ func (ceit * ChannelEntryIterator ) Next () * registryapi.ChannelEntry {
57
+ if ceit .error != nil {
58
+ return nil
59
+ }
60
+ next , err := ceit .stream .Recv ()
61
+ if err == io .EOF {
62
+ return nil
63
+ }
64
+ if err != nil {
65
+ ceit .error = err
66
+ }
67
+ return next
68
+ }
69
+
70
+ func (ceit * ChannelEntryIterator ) Error () error {
71
+ return ceit .error
72
+ }
73
+
74
+ func NewNamespaceSourceQuerier (sources map [CatalogKey ]client.Interface , clients map [CatalogKey ]* client.Client ) * NamespaceSourceQuerier {
41
75
return & NamespaceSourceQuerier {
42
76
sources : sources ,
77
+ clients : clients ,
43
78
}
44
79
}
45
80
@@ -50,23 +85,23 @@ func (q *NamespaceSourceQuerier) Queryable() error {
50
85
return nil
51
86
}
52
87
53
- func (q * NamespaceSourceQuerier ) FindProvider (api opregistry.APIKey , initialSource CatalogKey ) (* api .Bundle , * CatalogKey , error ) {
88
+ func (q * NamespaceSourceQuerier ) FindProvider (api opregistry.APIKey , initialSource CatalogKey , pkgName string ) (* registryapi .Bundle , * CatalogKey , error ) {
54
89
if initialSource .Name != "" && initialSource .Namespace != "" {
55
- source , ok := q .sources [initialSource ]
90
+ client , ok := q .clients [initialSource ]
56
91
if ok {
57
- if bundle , err := source . GetBundleThatProvides (context .TODO (), api .Group , api .Version , api .Kind ); err == nil {
92
+ if bundle , err := FindBundleThatProvides (context .TODO (), client , api .Group , api .Version , api .Kind , pkgName ); err == nil {
58
93
return bundle , & initialSource , nil
59
94
}
60
- if bundle , err := source . GetBundleThatProvides (context .TODO (), api .Plural + "." + api .Group , api .Version , api .Kind ); err == nil {
95
+ if bundle , err := FindBundleThatProvides (context .TODO (), client , api .Plural + "." + api .Group , api .Version , api .Kind , pkgName ); err == nil {
61
96
return bundle , & initialSource , nil
62
97
}
63
98
}
64
99
}
65
- for key , source := range q .sources {
66
- if bundle , err := source . GetBundleThatProvides (context .TODO (), api .Group , api .Version , api .Kind ); err == nil {
100
+ for key , client := range q .clients {
101
+ if bundle , err := FindBundleThatProvides (context .TODO (), client , api .Group , api .Version , api .Kind , pkgName ); err == nil {
67
102
return bundle , & key , nil
68
103
}
69
- if bundle , err := source . GetBundleThatProvides (context .TODO (), api .Plural + "." + api .Group , api .Version , api .Kind ); err == nil {
104
+ if bundle , err := FindBundleThatProvides (context .TODO (), client , api .Plural + "." + api .Group , api .Version , api .Kind , pkgName ); err == nil {
70
105
return bundle , & key , nil
71
106
}
72
107
}
@@ -191,3 +226,51 @@ func (q *NamespaceSourceQuerier) findChannelHead(currentVersion *semver.Version,
191
226
}
192
227
return nil , nil
193
228
}
229
+
230
+ // GetLatestChannelEntriesThatProvide uses registry client to get a list of
231
+ // latest channel entries that provide the requested API (via an iterator)
232
+ func GetLatestChannelEntriesThatProvide (ctx context.Context , c * client.Client , group , version , kind string ) (* ChannelEntryIterator , error ) {
233
+ stream , err := c .Registry .GetLatestChannelEntriesThatProvide (ctx , & registryapi.GetLatestProvidersRequest {Group : group , Version : version , Kind : kind })
234
+ if err != nil {
235
+ return nil , err
236
+ }
237
+ return NewChannelEntryIterator (stream ), nil
238
+ }
239
+
240
+ // FilterChannelEntries filters out a channel entries that provide the requested
241
+ // API and come from the same package with original operator and returns the
242
+ // first entry on the list
243
+ func FilterChannelEntries (it * ChannelEntryIterator , pkgName string ) * opregistry.ChannelEntry {
244
+ var entry * opregistry.ChannelEntry
245
+ for e := it .Next (); e != nil ; e = it .Next () {
246
+ if e .PackageName != pkgName {
247
+ entry = & opregistry.ChannelEntry {
248
+ PackageName : e .PackageName ,
249
+ ChannelName : e .ChannelName ,
250
+ BundleName : e .BundleName ,
251
+ Replaces : e .Replaces ,
252
+ }
253
+ break
254
+ }
255
+ }
256
+ return entry
257
+ }
258
+
259
+ // FindBundleThatProvides returns a bundle that provides the request API and
260
+ // doesn't belong to the provided package
261
+ func FindBundleThatProvides (ctx context.Context , client * client.Client , group , version , kind , pkgName string ) (* api.Bundle , error ) {
262
+ it , err := GetLatestChannelEntriesThatProvide (ctx , client , group , version , kind )
263
+ if err != nil {
264
+ return nil , err
265
+ }
266
+
267
+ entry := FilterChannelEntries (it , pkgName )
268
+ if entry != nil {
269
+ return nil , fmt .Errorf ("Unable to find a channel entry which doesn't belong to package %s" , pkgName )
270
+ }
271
+ bundle , err := client .Registry .GetBundle (ctx , & registryapi.GetBundleRequest {PkgName : entry .PackageName , ChannelName : entry .ChannelName , CsvName : entry .BundleName })
272
+ if err != nil {
273
+ return nil , err
274
+ }
275
+ return bundle , nil
276
+ }
0 commit comments