Skip to content

Commit 0632089

Browse files
committed
[CI] Add queue size, running count metrics
This commits allows the container to report 3 additional metrics at every sampling event: - a heartbeat - the size of the workflow queue (filtered) - the number of running workflows (filtered) The heartbeat is a simple metric allowing us to monitor the metrics health. Before this commit, a new metrics was pushed only when a workflow was completed. This meant we had to wait a few hours before noticing if the metrics container was unable to push metrics. In addition to this, this commits adds a sampling of the workflow queue size and running count. This should allow us to better understand the load, and improve the autoscale values we pick for the cluster. Signed-off-by: Nathan Gauër <[email protected]>
1 parent 7060d2a commit 0632089

File tree

1 file changed

+83
-14
lines changed

1 file changed

+83
-14
lines changed

.ci/metrics/metrics.py

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,67 @@ class JobMetrics:
2626
workflow_id: int
2727

2828

29-
def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
29+
@dataclass
30+
class GaugeMetric:
31+
name: str
32+
value: int
33+
time_ns: int
34+
35+
36+
def get_sampled_workflow_metrics(github_repo: github.Repository):
37+
"""Gets global statistics about the Github workflow queue
38+
39+
Args:
40+
github_repo: A github repo object to use to query the relevant information.
41+
42+
Returns:
43+
Returns a list of GaugeMetric objects, containing the relevant metrics about
44+
the workflow
45+
"""
46+
47+
# Other states are available (pending, waiting, etc), but the meaning
48+
# is not documented (See #70540).
49+
# "queued" seems to be the info we want.
50+
queued_workflow_count = len(
51+
[
52+
x
53+
for x in github_repo.get_workflow_runs(status="queued")
54+
if x.name in WORKFLOWS_TO_TRACK
55+
]
56+
)
57+
running_workflow_count = len(
58+
[
59+
x
60+
for x in github_repo.get_workflow_runs(status="in_progress")
61+
if x.name in WORKFLOWS_TO_TRACK
62+
]
63+
)
64+
65+
workflow_metrics = []
66+
workflow_metrics.append(
67+
GaugeMetric(
68+
"premerge_queued_workflow_count",
69+
queued_workflow_count,
70+
time.time_ns(),
71+
)
72+
)
73+
workflow_metrics.append(
74+
GaugeMetric(
75+
"premerge_running_workflow_count",
76+
running_workflow_count,
77+
time.time_ns(),
78+
)
79+
)
80+
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
81+
workflow_metrics.append(
82+
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
83+
)
84+
return workflow_metrics
85+
86+
87+
def get_per_workflow_metrics(
88+
github_repo: github.Repository, workflows_to_track: dict[str, int]
89+
):
3090
"""Gets the metrics for specified Github workflows.
3191
3292
This function takes in a list of workflows to track, and optionally the
@@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
43103
Returns a list of JobMetrics objects, containing the relevant metrics about
44104
the workflow.
45105
"""
46-
workflow_runs = iter(github_repo.get_workflow_runs())
47-
48106
workflow_metrics = []
49107

50108
workflows_to_include = set(workflows_to_track.keys())
51109

52-
while len(workflows_to_include) > 0:
53-
workflow_run = next(workflow_runs)
110+
for workflow_run in iter(github_repo.get_workflow_runs()):
111+
if len(workflows_to_include) == 0:
112+
break
113+
54114
if workflow_run.status != "completed":
55115
continue
56116

@@ -129,10 +189,16 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
129189
"""
130190
metrics_batch = []
131191
for workflow_metric in workflow_metrics:
132-
workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
133-
metrics_batch.append(
134-
f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
135-
)
192+
if isinstance(workflow_metric, GaugeMetric):
193+
name = workflow_metric.name.lower().replace(" ", "_")
194+
metrics_batch.append(
195+
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
196+
)
197+
else:
198+
name = workflow_metric.job_name.lower().replace(" ", "_")
199+
metrics_batch.append(
200+
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
201+
)
136202

137203
request_data = "\n".join(metrics_batch)
138204
response = requests.post(
@@ -164,16 +230,19 @@ def main():
164230
# Enter the main loop. Every five minutes we wake up and dump metrics for
165231
# the relevant jobs.
166232
while True:
167-
current_metrics = get_metrics(github_repo, workflows_to_track)
233+
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
234+
current_metrics += get_sampled_workflow_metrics(github_repo)
168235
if len(current_metrics) == 0:
169-
print("No metrics found to upload.", file=sys.stderr)
170-
continue
236+
print("No metrics found to upload.", file=sys.stdout)
171237

172238
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
173-
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
239+
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)
174240

175241
for workflow_metric in reversed(current_metrics):
176-
workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
242+
if isinstance(workflow_metric, JobMetrics):
243+
workflows_to_track[
244+
workflow_metric.job_name
245+
] = workflow_metric.workflow_id
177246

178247
time.sleep(SCRAPE_INTERVAL_SECONDS)
179248

0 commit comments

Comments
 (0)