@@ -23,6 +23,7 @@ import (
23
23
"os/exec"
24
24
"os/signal"
25
25
"path/filepath"
26
+ "regexp"
26
27
"runtime"
27
28
"runtime/debug"
28
29
"strconv"
@@ -418,7 +419,7 @@ func Run(options ...RunOption) {
418
419
}
419
420
420
421
wg .Add (1 )
421
- go startAPIEndpoint (ctx , cfg , & wg , apiServices , tunneledPortsService , metricsReporter , apiEndpointOpts ... )
422
+ go startAPIEndpoint (ctx , cfg , & wg , apiServices , tunneledPortsService , metricsReporter , supervisorMetrics , topService , apiEndpointOpts ... )
422
423
423
424
wg .Add (1 )
424
425
go startSSHServer (ctx , cfg , & wg )
@@ -1187,7 +1188,28 @@ func isBlacklistedEnvvar(name string) bool {
1187
1188
return false
1188
1189
}
1189
1190
1190
- func startAPIEndpoint (ctx context.Context , cfg * Config , wg * sync.WaitGroup , services []RegisterableService , tunneled * ports.TunneledPortsService , metricsReporter * metrics.GrpcMetricsReporter , opts ... grpc.ServerOption ) {
1191
+ var websocketCloseErrorPattern = regexp .MustCompile (`websocket: close (\d+)` )
1192
+
1193
+ func extractCloseErrorCode (errStr string ) string {
1194
+ matches := websocketCloseErrorPattern .FindStringSubmatch (errStr )
1195
+ if len (matches ) < 2 {
1196
+ return "unknown"
1197
+ }
1198
+
1199
+ return matches [1 ]
1200
+ }
1201
+
1202
+ func startAPIEndpoint (
1203
+ ctx context.Context ,
1204
+ cfg * Config ,
1205
+ wg * sync.WaitGroup ,
1206
+ services []RegisterableService ,
1207
+ tunneled * ports.TunneledPortsService ,
1208
+ metricsReporter * metrics.GrpcMetricsReporter ,
1209
+ supervisorMetrics * metrics.SupervisorMetrics ,
1210
+ topService * TopService ,
1211
+ opts ... grpc.ServerOption ,
1212
+ ) {
1191
1213
defer wg .Done ()
1192
1214
defer log .Debug ("startAPIEndpoint shutdown" )
1193
1215
@@ -1308,6 +1330,17 @@ func startAPIEndpoint(ctx context.Context, cfg *Config, wg *sync.WaitGroup, serv
1308
1330
tunnelOverWebSocket (tunneled , conn )
1309
1331
}))
1310
1332
routes .Handle ("/_supervisor/tunnel/ssh" , http .HandlerFunc (func (rw http.ResponseWriter , r * http.Request ) {
1333
+ var err error
1334
+ supervisorMetrics .SSHTunnelOpenedTotal .WithLabelValues ().Inc ()
1335
+ defer func () {
1336
+ code := "unknown"
1337
+ if err != nil {
1338
+ code = extractCloseErrorCode (err .Error ())
1339
+ }
1340
+ supervisorMetrics .SSHTunnelClosedTotal .WithLabelValues (code ).Inc ()
1341
+ }()
1342
+ startTime := time .Now ()
1343
+ log := log .WithField ("userAgent" , r .Header .Get ("user-agent" )).WithField ("remoteAddr" , r .RemoteAddr )
1311
1344
wsConn , err := upgrader .Upgrade (rw , r , nil )
1312
1345
if err != nil {
1313
1346
log .WithError (err ).Error ("tunnel ssh: upgrade to the WebSocket protocol failed" )
@@ -1331,13 +1364,21 @@ func startAPIEndpoint(ctx context.Context, cfg *Config, wg *sync.WaitGroup, serv
1331
1364
1332
1365
go io .Copy (conn , conn2 )
1333
1366
_ , err = io .Copy (conn2 , conn )
1334
- if err != nil && ! websocket .IsUnexpectedCloseError (err , websocket .CloseGoingAway , websocket .CloseAbnormalClosure ) {
1335
- log .WithError (err ).Error ("tunnel ssh: error returned from io.copy" )
1367
+ if err != nil {
1368
+ var usedCpu , usedMemory int64
1369
+ data := topService .data
1370
+ if data != nil && data .Cpu != nil {
1371
+ usedCpu = data .Cpu .Used
1372
+ }
1373
+ if data != nil && data .Memory != nil {
1374
+ usedMemory = data .Memory .Used
1375
+ }
1376
+ log .WithField ("usedCpu" , usedCpu ).WithField ("usedMemory" , usedMemory ).WithError (err ).Error ("tunnel ssh: error returned from io.copy" )
1336
1377
}
1337
1378
1338
1379
conn .Close ()
1339
1380
conn2 .Close ()
1340
- log .Infof ("tunnel ssh: Disconnect from %s" , conn .RemoteAddr ())
1381
+ log .WithField ( "duration" , time . Since ( startTime ). Seconds ()). Infof ("tunnel ssh: Disconnect from %s" , conn .RemoteAddr ())
1341
1382
}))
1342
1383
if cfg .DebugEnable {
1343
1384
routes .Handle ("/_supervisor/debug/tunnels" , http .HandlerFunc (func (rw http.ResponseWriter , r * http.Request ) {
0 commit comments