@@ -6,19 +6,17 @@ import (
6
6
"errors"
7
7
"fmt"
8
8
"net"
9
- "os"
10
- "sync"
11
-
12
9
"net/http"
13
10
endpoint "net/http/pprof"
11
+ "os"
14
12
"runtime/pprof"
13
+ "sync"
15
14
16
15
"github.com/sirupsen/logrus"
17
16
"github.com/spf13/cobra"
18
17
"google.golang.org/grpc"
19
18
"google.golang.org/grpc/reflection"
20
19
21
- "github.com/operator-framework/operator-registry/alpha/declcfg"
22
20
"github.com/operator-framework/operator-registry/pkg/api"
23
21
health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
24
22
"github.com/operator-framework/operator-registry/pkg/lib/dns"
@@ -30,6 +28,8 @@ import (
30
28
31
29
type serve struct {
32
30
configDir string
31
+ cacheDir string
32
+ cacheOnly bool
33
33
34
34
port string
35
35
terminationLog string
@@ -75,12 +75,16 @@ will not be reflected in the served content.
75
75
cmd .Flags ().StringVarP (& s .terminationLog , "termination-log" , "t" , "/dev/termination-log" , "path to a container termination log file" )
76
76
cmd .Flags ().StringVarP (& s .port , "port" , "p" , "50051" , "port number to serve on" )
77
77
cmd .Flags ().StringVar (& s .pprofAddr , "pprof-addr" , "" , "address of startup profiling endpoint (addr:port format)" )
78
+ cmd .Flags ().StringVar (& s .cacheDir , "cache-dir" , "" , "if set, sync and persist server cache directory" )
79
+ cmd .Flags ().BoolVar (& s .cacheOnly , "cache-only" , false , "sync the serve cache and exit without serving" )
78
80
return cmd
79
81
}
80
82
81
83
func (s * serve ) run (ctx context.Context ) error {
82
84
p := newProfilerInterface (s .pprofAddr , s .logger )
83
- p .startEndpoint ()
85
+ if err := p .startEndpoint (); err != nil {
86
+ return fmt .Errorf ("could not start pprof endpoint: %v" , err )
87
+ }
84
88
if err := p .startCpuProfileCache (); err != nil {
85
89
return fmt .Errorf ("could not start CPU profile: %v" , err )
86
90
}
@@ -98,24 +102,18 @@ func (s *serve) run(ctx context.Context) error {
98
102
99
103
s .logger = s .logger .WithFields (logrus.Fields {"configs" : s .configDir , "port" : s .port })
100
104
101
- cfg , err := declcfg .LoadFS (os .DirFS (s .configDir ))
102
- if err != nil {
103
- return fmt .Errorf ("load declarative config directory: %v" , err )
104
- }
105
-
106
- m , err := declcfg .ConvertToModel (* cfg )
107
- if err != nil {
108
- return fmt .Errorf ("could not build index model from declarative config: %v" , err )
109
- }
110
- store , err := registry .NewQuerier (m )
105
+ store , err := registry .NewQuerierFromFS (os .DirFS (s .configDir ), s .cacheDir )
111
106
defer store .Close ()
112
107
if err != nil {
113
108
return err
114
109
}
110
+ if s .cacheOnly {
111
+ return nil
112
+ }
115
113
116
114
lis , err := net .Listen ("tcp" , ":" + s .port )
117
115
if err != nil {
118
- s . logger . Fatalf ("failed to listen: %s" , err )
116
+ return fmt . Errorf ("failed to listen: %s" , err )
119
117
}
120
118
121
119
grpcServer := grpc .NewServer ()
@@ -129,7 +127,9 @@ func (s *serve) run(ctx context.Context) error {
129
127
return grpcServer .Serve (lis )
130
128
}, func () {
131
129
grpcServer .GracefulStop ()
132
- p .stopEndpoint (p .logger .Context )
130
+ if err := p .stopEndpoint (ctx ); err != nil {
131
+ s .logger .Warnf ("error shutting down pprof server: %v" , err )
132
+ }
133
133
})
134
134
135
135
}
@@ -147,7 +147,8 @@ type profilerInterface struct {
147
147
cacheReady bool
148
148
cacheLock sync.RWMutex
149
149
150
- logger * logrus.Entry
150
+ logger * logrus.Entry
151
+ closeErr chan error
151
152
}
152
153
153
154
func newProfilerInterface (a string , log * logrus.Entry ) * profilerInterface {
@@ -162,10 +163,10 @@ func (p *profilerInterface) isEnabled() bool {
162
163
return p .addr != ""
163
164
}
164
165
165
- func (p * profilerInterface ) startEndpoint () {
166
+ func (p * profilerInterface ) startEndpoint () error {
166
167
// short-circuit if not enabled
167
168
if ! p .isEnabled () {
168
- return
169
+ return nil
169
170
}
170
171
171
172
mux := http .NewServeMux ()
@@ -181,14 +182,22 @@ func (p *profilerInterface) startEndpoint() {
181
182
Handler : mux ,
182
183
}
183
184
184
- // goroutine exits with main
185
- go func () {
185
+ lis , err := net .Listen ("tcp" , p .addr )
186
+ if err != nil {
187
+ return err
188
+ }
186
189
187
- p .logger .Info ("starting pprof endpoint" )
188
- if err := p .server .ListenAndServe (); err != nil && ! errors .Is (err , http .ErrServerClosed ) {
189
- p .logger .Fatal (err )
190
- }
190
+ p .closeErr = make (chan error )
191
+ go func () {
192
+ p .closeErr <- func () error {
193
+ p .logger .Info ("starting pprof endpoint" )
194
+ if err := p .server .Serve (lis ); err != nil && ! errors .Is (err , http .ErrServerClosed ) {
195
+ return err
196
+ }
197
+ return nil
198
+ }()
191
199
}()
200
+ return nil
192
201
}
193
202
194
203
func (p * profilerInterface ) startCpuProfileCache () error {
@@ -222,10 +231,14 @@ func (p *profilerInterface) httpHandler(w http.ResponseWriter, r *http.Request)
222
231
w .Write (p .cache .Bytes ())
223
232
}
224
233
225
- func (p * profilerInterface ) stopEndpoint (ctx context.Context ) {
234
+ func (p * profilerInterface ) stopEndpoint (ctx context.Context ) error {
235
+ if ! p .isEnabled () {
236
+ return nil
237
+ }
226
238
if err := p .server .Shutdown (ctx ); err != nil {
227
- p . logger . Fatal ( err )
239
+ return err
228
240
}
241
+ return <- p .closeErr
229
242
}
230
243
231
244
func (p * profilerInterface ) isCacheReady () bool {
0 commit comments