Skip to content

Commit 1be347c

Browse files
authored
Save metrics to cloud to preserve metrics history (#1940)
1 parent bf4ab40 commit 1be347c

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

pkg/operator/resources/job/batchapi/job.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,12 @@ func handleJobSubmissionError(jobKey spec.JobKey, jobErr error) {
220220
}
221221
}
222222

223+
// delete k8s job, queue and save batch metrics from prometheus to cloud
223224
func deleteJobRuntimeResources(jobKey spec.JobKey) error {
224225
err := errors.FirstError(
225226
deleteK8sJob(jobKey),
226227
deleteQueueByJobKeyIfExists(jobKey),
228+
saveMetricsToCloud(jobKey),
227229
)
228230

229231
if err != nil {

pkg/operator/resources/job/batchapi/job_status.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ func getJobStatusFromJobState(jobState *job.State, k8sJob *kbatch.Job, pods []kc
8888
}
8989
}
9090

91-
if jobState.Status.IsCompleted() && jobState.EndTime != nil {
92-
metrics, err := getBatchMetrics(jobKey, *jobState.EndTime)
91+
if _, ok := jobState.LastUpdatedMap[_completedMetricsFileKey]; ok && jobState.Status.IsCompleted() {
92+
metrics, err := readMetricsFromCloud(jobKey)
9393
if err != nil {
9494
return nil, err
9595
}

pkg/operator/resources/job/batchapi/metrics.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package batchapi
1919
import (
2020
"context"
2121
"fmt"
22+
"path"
2223
"time"
2324

2425
"github.com/cortexlabs/cortex/pkg/lib/errors"
@@ -32,6 +33,7 @@ import (
3233

3334
const (
3435
_metricsRequestTimeoutSeconds = 10
36+
_completedMetricsFileKey = "metrics.json"
3537
)
3638

3739
func getBatchMetrics(jobKey spec.JobKey, t time.Time) (metrics.BatchMetrics, error) {
@@ -143,3 +145,28 @@ func queryPrometheusVec(promAPIv1 promv1.API, query string, t time.Time) (model.
143145

144146
return values, nil
145147
}
148+
149+
func saveMetricsToCloud(jobKey spec.JobKey) error {
150+
t := time.Now()
151+
batchMetrics, err := getBatchMetrics(jobKey, t)
152+
if err != nil {
153+
return err
154+
}
155+
156+
s3Key := path.Join(jobKey.Prefix(config.ClusterName()), _completedMetricsFileKey)
157+
err = config.UploadJSONToBucket(batchMetrics, s3Key)
158+
if err != nil {
159+
return err
160+
}
161+
return nil
162+
}
163+
164+
func readMetricsFromCloud(jobKey spec.JobKey) (metrics.BatchMetrics, error) {
165+
s3Key := path.Join(jobKey.Prefix(config.ClusterName()), _completedMetricsFileKey)
166+
batchMetrics := metrics.BatchMetrics{}
167+
err := config.ReadJSONFromBucket(&batchMetrics, s3Key)
168+
if err != nil {
169+
return batchMetrics, err
170+
}
171+
return batchMetrics, nil
172+
}

0 commit comments

Comments
 (0)