-
Notifications
You must be signed in to change notification settings - Fork 419
feat: receive kvmetrics from sglang scheduler #1465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: receive kvmetrics from sglang scheduler #1465
Conversation
👋 Hi zixuanzhang226! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
""" WalkthroughThis update adds asynchronous ZeroMQ-based metric reception and publishing to the Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler
participant SGLangWorker
participant WorkerMetricsPublisher
Scheduler->>SGLangWorker: Send metrics via ZeroMQ (PULL socket)
loop Continuous reception
SGLangWorker->>SGLangWorker: _receive_and_publish_metrics_loop()
SGLangWorker->>WorkerMetricsPublisher: Publish received metrics
end
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (1)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
examples/sglang/components/worker.py (1)
27-33
:⚠️ Potential issueDuplicate
asyncio
import triggers Ruff F811
asyncio
is imported twice (lines 27 & 32). Ruff flags this (F811) and the pre-merge pipeline is failing. Remove the second import.-import asyncio
🧰 Tools
🪛 Ruff (0.11.9)
32-32: Redefinition of unused
asyncio
from line 27Remove definition:
asyncio
(F811)
🧹 Nitpick comments (1)
examples/sglang/components/worker.py (1)
91-107
: Clean-ups inside_receive_and_publish_metrics_loop
e
is never used → Ruff F841.asyncio.CancelledError
isn’t handled, so shutdown will be logged as an error.- The multi-line
publish()
call is mis-indented; Black will re-indent but it hurts readability.- except Exception as e: - logger.exception("Failed to receive or publish metrics") + except asyncio.CancelledError: + # Graceful shutdown + break + except Exception: + logger.exception("Failed to receive or publish metrics")Let Black take care of the call’s indentation.
🧰 Tools
🪛 Ruff (0.11.9)
105-105: Local variable
e
is assigned to but never usedRemove assignment to unused variable
e
(F841)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/sglang/components/worker.py
(3 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
examples/sglang/components/worker.py
32-32: Redefinition of unused asyncio
from line 27
Remove definition: asyncio
(F811)
105-105: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1465/merge) by zixuanzhang226.
examples/sglang/components/worker.py
[error] 1-1: isort failed and modified the file to fix import sorting issues.
[error] 1-1: Black formatting check failed and reformatted the file to fix code style issues.
[error] 28-43: Ruff found and fixed 1 error related to import order and formatting in this file.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and Test - vllm
context = zmq.asyncio.Context() | ||
self.receive_metrics_from_scheduler = get_zmq_socket( | ||
context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True | ||
) | ||
asyncio.create_task(self._receive_and_publish_metrics_loop()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Background task is never tracked or cancelled
asyncio.create_task()
is invoked but the returned task handle is discarded.
Untracked tasks hide exceptions and can leak on shutdown.
-asyncio.create_task(self._receive_and_publish_metrics_loop())
+self._metrics_task = asyncio.create_task(
+ self._receive_and_publish_metrics_loop()
+)
Add a shutdown hook (__del__
or framework-specific) to cancel()
the task and await it.
🏁 Script executed:
#!/bin/bash
# Verify untracked asyncio.create_task usage in worker.py
rg -n "asyncio.create_task" -C3 examples/sglang/components/worker.py
Length of output: 795
🏁 Script executed:
#!/bin/bash
# Locate class definition and constructor
rg -n "class SGLangWorker" -C2 examples/sglang/components/worker.py
rg -n "def __init__" -C3 examples/sglang/components/worker.py
# Check for destructor or shutdown/cleanup hooks
rg -n "__del__" -C2 examples/sglang/components/worker.py
rg -n "shutdown" -C2 examples/sglang/components/worker.py
rg -n "async def close" -C2 examples/sglang/components/worker.py
rg -n "def stop" -C2 examples/sglang/components/worker.py
Length of output: 759
Track and cancel background tasks on shutdown
Untracked calls to asyncio.create_task()
(see lines 70 and 166 in examples/sglang/components/worker.py
) will swallow exceptions and leak on process teardown. Capture each task handle and implement a cleanup hook to cancel and await them.
• File: examples/sglang/components/worker.py
– Line 70: metrics loop task
– Line 166: prefill generator task
Suggested diff:
--- a/examples/sglang/components/worker.py
+++ b/examples/sglang/components/worker.py
@@ -67,3 +67,6 @@
context = zmq.asyncio.Context()
self.receive_metrics_from_scheduler = get_zmq_socket(
context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
)
- asyncio.create_task(self._receive_and_publish_metrics_loop())
+ self._metrics_task = asyncio.create_task(
+ self._receive_and_publish_metrics_loop()
+ )
@@ -164,3 +167,5 @@
)
- prefill_task = asyncio.create_task(self._prefill_generator(prefill))
+ self._prefill_task = asyncio.create_task(
+ self._prefill_generator(prefill)
+ )
+ async def close(self):
+ for t in (getattr(self, "_metrics_task", None),
+ getattr(self, "_prefill_task", None)):
+ if t:
+ t.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await t
Ensure your application/framework invokes await worker.close()
(or similar) on shutdown to properly cancel background loops.
🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 66 to 70, the
asyncio.create_task call that starts the metrics loop is not tracked, causing
potential unhandled exceptions and resource leaks on shutdown. Fix this by
storing the returned task handle as an instance variable, then implement a
cleanup method (e.g., async close or __del__) that cancels and awaits this task.
Also ensure the application calls this cleanup method on shutdown to properly
stop background tasks.
7098a22
to
0fb22bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
examples/sglang/components/worker.py (1)
65-70
: Background task is still untracked – leaks & swallowed exceptions remain
Theasyncio.create_task()
call is unchanged from the previous revision, so the task handle is still discarded. This duplicates the earlier concern that un-awaited tasks can leak on shutdown and hide raised exceptions.- asyncio.create_task(self._receive_and_publish_metrics_loop()) + self._metrics_task = asyncio.create_task( + self._receive_and_publish_metrics_loop() + )Add a cleanup hook (e.g.
async def close()
or framework-specific shutdown callback) that cancels and awaitsself._metrics_task
(and the pre-existing_prefill_task
).
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/sglang/components/worker.py
(3 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
examples/sglang/components/worker.py
104-104: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1465/merge) by zixuanzhang226.
examples/sglang/components/worker.py
[error] 1-1: Pre-commit hook 'black' failed: file was reformatted. Run 'black' to apply formatting changes.
[error] 92-107: Pre-commit hook 'ruff' failed: 1 error found and fixed in this file related to code style or linting issues.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and Test - vllm
examples/sglang/components/worker.py
Outdated
async def _receive_and_publish_metrics_loop(self): | ||
while True: | ||
try: | ||
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() | ||
self.metrics_publisher.publish( | ||
kv_metrics.request_active_slots, | ||
kv_metrics.request_total_slots, | ||
kv_metrics.kv_active_blocks, | ||
kv_metrics.kv_total_blocks, | ||
kv_metrics.num_requests_waiting, | ||
kv_metrics.gpu_cache_usage_perc, | ||
kv_metrics.gpu_prefix_cache_hit_rate, | ||
kv_metrics.data_parallel_rank) | ||
|
||
except Exception as e: | ||
logger.exception("Failed to receive or publish metrics") | ||
|
||
def _get_bootstrap_info(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix linter failure & improve robustness of the metrics loop
e
is never used – Ruff (F841
) fails the pre-commit hook.- Broad exception handling combined with a tight loop risks log-spamming if the socket is closed or permanently broken.
- Formatting currently fails
black
.
@@
- except Exception as e:
- logger.exception("Failed to receive or publish metrics")
+ # On cancellation, break the loop gracefully; otherwise log & continue
+ except asyncio.CancelledError:
+ break
+ except Exception:
+ logger.exception("Failed to receive or publish metrics")
This:
• Removes the unused variable, clearing the Ruff error.
• Handles CancelledError
so the loop terminates cleanly when the task is cancelled during shutdown.
• Keeps the stack trace via logger.exception
.
After applying, run black
to satisfy the formatting hook.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def _receive_and_publish_metrics_loop(self): | |
while True: | |
try: | |
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() | |
self.metrics_publisher.publish( | |
kv_metrics.request_active_slots, | |
kv_metrics.request_total_slots, | |
kv_metrics.kv_active_blocks, | |
kv_metrics.kv_total_blocks, | |
kv_metrics.num_requests_waiting, | |
kv_metrics.gpu_cache_usage_perc, | |
kv_metrics.gpu_prefix_cache_hit_rate, | |
kv_metrics.data_parallel_rank) | |
except Exception as e: | |
logger.exception("Failed to receive or publish metrics") | |
def _get_bootstrap_info(self): | |
async def _receive_and_publish_metrics_loop(self): | |
while True: | |
try: | |
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() | |
self.metrics_publisher.publish( | |
kv_metrics.request_active_slots, | |
kv_metrics.request_total_slots, | |
kv_metrics.kv_active_blocks, | |
kv_metrics.kv_total_blocks, | |
kv_metrics.num_requests_waiting, | |
kv_metrics.gpu_cache_usage_perc, | |
kv_metrics.gpu_prefix_cache_hit_rate, | |
kv_metrics.data_parallel_rank) | |
# On cancellation, break the loop gracefully; otherwise log & continue | |
except asyncio.CancelledError: | |
break | |
except Exception: | |
logger.exception("Failed to receive or publish metrics") | |
def _get_bootstrap_info(self): | |
... |
🧰 Tools
🪛 Ruff (0.11.9)
104-104: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1465/merge) by zixuanzhang226.
[error] 92-107: Pre-commit hook 'ruff' failed: 1 error found and fixed in this file related to code style or linting issues.
🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 90 to 107, remove the
unused exception variable 'e' to fix the linter error, add handling for
asyncio.CancelledError to allow the loop to exit cleanly on task cancellation,
and ensure the logger.exception call remains to keep the stack trace. Finally,
reformat the code with black to fix formatting issues.
c1da15e
to
edda9c7
Compare
edda9c7
to
ea007af
Compare
This is awesome! Thanks for making this and the associated SGLang PR. I'll tinker with it tomorrow and we can get it in!! |
Hello @ishandhanani, could you kindly review and approve it when you have a chance? Thank you so much! |
Hi @zixuanzhang226 - this PR looks ok overall. I want to get @tedzhouhk eyes on it as well. We should also get the SGL PR in as well. I'll ping their team |
Overview:
This PR implements the second step of integrating the Dynamo Planner with SGLang. It focuses on receiving KV-related metrics sent from the SGLang scheduler to the Dynamo worker via a ZMQ socket. Once received, these metrics are published using the metrics publisher.
Completed end-to-end testing and confirmed that KV metrics are successfully sent from the SGLang scheduler and received by the Dynamo worker.
Details:
Adds logic to receive KV metrics in the Dynamo worker through a dedicated ZMQ socket.
Publishes the received metrics via the existing metrics publisher.
Related Work:
First piece of the integration (sending metrics from the SGLang scheduler) in SGLang repo: sgl-project/sglang#6721
Related issue: #1196
Summary by CodeRabbit