Skip to content

feat: receive kvmetrics from sglang scheduler #1465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

zixuanzhang226
Copy link

@zixuanzhang226 zixuanzhang226 commented Jun 10, 2025

Overview:

This PR implements the second step of integrating the Dynamo Planner with SGLang. It focuses on receiving KV-related metrics sent from the SGLang scheduler to the Dynamo worker via a ZMQ socket. Once received, these metrics are published using the metrics publisher.

Completed end-to-end testing and confirmed that KV metrics are successfully sent from the SGLang scheduler and received by the Dynamo worker.

Details:

Adds logic to receive KV metrics in the Dynamo worker through a dedicated ZMQ socket.

Publishes the received metrics via the existing metrics publisher.

Related Work:

First piece of the integration (sending metrics from the SGLang scheduler) in SGLang repo: sgl-project/sglang#6721

Related issue: #1196

Summary by CodeRabbit

  • New Features
    • Added real-time runtime metrics monitoring and publishing for improved observability.

Copy link

copy-pr-bot bot commented Jun 10, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Copy link

👋 Hi zixuanzhang226! Thank you for contributing to ai-dynamo/dynamo.

Just a reminder: The NVIDIA Test Github Validation CI runs an essential subset of the testing framework to quickly catch errors.Your PR reviewers may elect to test the changes comprehensively before approving your changes.

🚀

@github-actions github-actions bot added the external-contribution Pull request is from an external contributor label Jun 10, 2025
Copy link
Contributor

coderabbitai bot commented Jun 10, 2025

"""

Walkthrough

This update adds asynchronous ZeroMQ-based metric reception and publishing to the SGLangWorker class. It introduces a metrics publisher, sets up a PULL socket for receiving metrics from the scheduler, and launches a background coroutine to process and publish these metrics continuously, with error handling and logging integrated.

Changes

File(s) Change Summary
examples/sglang/components/worker.py Added async ZeroMQ integration for metric reception; introduced WorkerMetricsPublisher; added background coroutine for receiving and publishing metrics; updated class with new attributes and methods.

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler
    participant SGLangWorker
    participant WorkerMetricsPublisher

    Scheduler->>SGLangWorker: Send metrics via ZeroMQ (PULL socket)
    loop Continuous reception
        SGLangWorker->>SGLangWorker: _receive_and_publish_metrics_loop()
        SGLangWorker->>WorkerMetricsPublisher: Publish received metrics
    end
Loading

Poem

In the warren, stats now flow,
Through ZeroMQ, fast and slow.
Worker listens, metrics stream,
Publishing numbers, a data dream.
Async tasks hop all night,
Keeping metrics shining bright!
🐇📊
"""


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0fb22bd and ea007af.

📒 Files selected for processing (1)
  • examples/sglang/components/worker.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • examples/sglang/components/worker.py
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and Test - vllm

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
examples/sglang/components/worker.py (1)

27-33: ⚠️ Potential issue

Duplicate asyncio import triggers Ruff F811

asyncio is imported twice (lines 27 & 32). Ruff flags this (F811) and the pre-merge pipeline is failing. Remove the second import.

-import asyncio
🧰 Tools
🪛 Ruff (0.11.9)

32-32: Redefinition of unused asyncio from line 27

Remove definition: asyncio

(F811)

🧹 Nitpick comments (1)
examples/sglang/components/worker.py (1)

91-107: Clean-ups inside _receive_and_publish_metrics_loop

  1. e is never used → Ruff F841.
  2. asyncio.CancelledError isn’t handled, so shutdown will be logged as an error.
  3. The multi-line publish() call is mis-indented; Black will re-indent but it hurts readability.
-            except Exception as e:
-                logger.exception("Failed to receive or publish metrics")
+            except asyncio.CancelledError:
+                # Graceful shutdown
+                break
+            except Exception:
+                logger.exception("Failed to receive or publish metrics")

Let Black take care of the call’s indentation.

🧰 Tools
🪛 Ruff (0.11.9)

105-105: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e8e728b and 7098a22.

📒 Files selected for processing (1)
  • examples/sglang/components/worker.py (3 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
examples/sglang/components/worker.py

32-32: Redefinition of unused asyncio from line 27

Remove definition: asyncio

(F811)


105-105: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)

🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1465/merge) by zixuanzhang226.
examples/sglang/components/worker.py

[error] 1-1: isort failed and modified the file to fix import sorting issues.


[error] 1-1: Black formatting check failed and reformatted the file to fix code style issues.


[error] 28-43: Ruff found and fixed 1 error related to import order and formatting in this file.

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and Test - vllm

Comment on lines +66 to +69
context = zmq.asyncio.Context()
self.receive_metrics_from_scheduler = get_zmq_socket(
context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
)
asyncio.create_task(self._receive_and_publish_metrics_loop())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Background task is never tracked or cancelled

asyncio.create_task() is invoked but the returned task handle is discarded.
Untracked tasks hide exceptions and can leak on shutdown.

-asyncio.create_task(self._receive_and_publish_metrics_loop())
+self._metrics_task = asyncio.create_task(
+    self._receive_and_publish_metrics_loop()
+)

Add a shutdown hook (__del__ or framework-specific) to cancel() the task and await it.


🏁 Script executed:

#!/bin/bash
# Verify untracked asyncio.create_task usage in worker.py
rg -n "asyncio.create_task" -C3 examples/sglang/components/worker.py

Length of output: 795


🏁 Script executed:

#!/bin/bash
# Locate class definition and constructor
rg -n "class SGLangWorker" -C2 examples/sglang/components/worker.py
rg -n "def __init__" -C3 examples/sglang/components/worker.py

# Check for destructor or shutdown/cleanup hooks
rg -n "__del__" -C2 examples/sglang/components/worker.py
rg -n "shutdown" -C2 examples/sglang/components/worker.py
rg -n "async def close" -C2 examples/sglang/components/worker.py
rg -n "def stop" -C2 examples/sglang/components/worker.py

Length of output: 759


Track and cancel background tasks on shutdown

Untracked calls to asyncio.create_task() (see lines 70 and 166 in examples/sglang/components/worker.py) will swallow exceptions and leak on process teardown. Capture each task handle and implement a cleanup hook to cancel and await them.

• File: examples/sglang/components/worker.py
– Line 70: metrics loop task
– Line 166: prefill generator task

Suggested diff:

--- a/examples/sglang/components/worker.py
+++ b/examples/sglang/components/worker.py
@@ -67,3 +67,6 @@
         context = zmq.asyncio.Context()
         self.receive_metrics_from_scheduler = get_zmq_socket(
             context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
         )
-        asyncio.create_task(self._receive_and_publish_metrics_loop())
+        self._metrics_task = asyncio.create_task(
+            self._receive_and_publish_metrics_loop()
+        )

@@ -164,3 +167,5 @@
             )
-            prefill_task = asyncio.create_task(self._prefill_generator(prefill))
+            self._prefill_task = asyncio.create_task(
+                self._prefill_generator(prefill)
+            )

+    async def close(self):
+        for t in (getattr(self, "_metrics_task", None),
+                  getattr(self, "_prefill_task", None)):
+            if t:
+                t.cancel()
+                with contextlib.suppress(asyncio.CancelledError):
+                    await t

Ensure your application/framework invokes await worker.close() (or similar) on shutdown to properly cancel background loops.

🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 66 to 70, the
asyncio.create_task call that starts the metrics loop is not tracked, causing
potential unhandled exceptions and resource leaks on shutdown. Fix this by
storing the returned task handle as an instance variable, then implement a
cleanup method (e.g., async close or __del__) that cancels and awaits this task.
Also ensure the application calls this cleanup method on shutdown to properly
stop background tasks.

@zixuanzhang226 zixuanzhang226 force-pushed the feat/receive-kvmetrics branch from 7098a22 to 0fb22bd Compare June 10, 2025 23:13
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
examples/sglang/components/worker.py (1)

65-70: Background task is still untracked – leaks & swallowed exceptions remain
The asyncio.create_task() call is unchanged from the previous revision, so the task handle is still discarded. This duplicates the earlier concern that un-awaited tasks can leak on shutdown and hide raised exceptions.

-        asyncio.create_task(self._receive_and_publish_metrics_loop())
+        self._metrics_task = asyncio.create_task(
+            self._receive_and_publish_metrics_loop()
+        )

Add a cleanup hook (e.g. async def close() or framework-specific shutdown callback) that cancels and awaits self._metrics_task (and the pre-existing _prefill_task).

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7098a22 and 0fb22bd.

📒 Files selected for processing (1)
  • examples/sglang/components/worker.py (3 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
examples/sglang/components/worker.py

104-104: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)

🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1465/merge) by zixuanzhang226.
examples/sglang/components/worker.py

[error] 1-1: Pre-commit hook 'black' failed: file was reformatted. Run 'black' to apply formatting changes.


[error] 92-107: Pre-commit hook 'ruff' failed: 1 error found and fixed in this file related to code style or linting issues.

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and Test - vllm

Comment on lines 90 to 108
async def _receive_and_publish_metrics_loop(self):
while True:
try:
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj()
self.metrics_publisher.publish(
kv_metrics.request_active_slots,
kv_metrics.request_total_slots,
kv_metrics.kv_active_blocks,
kv_metrics.kv_total_blocks,
kv_metrics.num_requests_waiting,
kv_metrics.gpu_cache_usage_perc,
kv_metrics.gpu_prefix_cache_hit_rate,
kv_metrics.data_parallel_rank)

except Exception as e:
logger.exception("Failed to receive or publish metrics")

def _get_bootstrap_info(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fix linter failure & improve robustness of the metrics loop

  1. e is never used – Ruff (F841) fails the pre-commit hook.
  2. Broad exception handling combined with a tight loop risks log-spamming if the socket is closed or permanently broken.
  3. Formatting currently fails black.
@@
-            except Exception as e:
-                logger.exception("Failed to receive or publish metrics")
+            # On cancellation, break the loop gracefully; otherwise log & continue
+            except asyncio.CancelledError:
+                break
+            except Exception:
+                logger.exception("Failed to receive or publish metrics")

This:
• Removes the unused variable, clearing the Ruff error.
• Handles CancelledError so the loop terminates cleanly when the task is cancelled during shutdown.
• Keeps the stack trace via logger.exception.
After applying, run black to satisfy the formatting hook.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def _receive_and_publish_metrics_loop(self):
while True:
try:
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj()
self.metrics_publisher.publish(
kv_metrics.request_active_slots,
kv_metrics.request_total_slots,
kv_metrics.kv_active_blocks,
kv_metrics.kv_total_blocks,
kv_metrics.num_requests_waiting,
kv_metrics.gpu_cache_usage_perc,
kv_metrics.gpu_prefix_cache_hit_rate,
kv_metrics.data_parallel_rank)
except Exception as e:
logger.exception("Failed to receive or publish metrics")
def _get_bootstrap_info(self):
async def _receive_and_publish_metrics_loop(self):
while True:
try:
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj()
self.metrics_publisher.publish(
kv_metrics.request_active_slots,
kv_metrics.request_total_slots,
kv_metrics.kv_active_blocks,
kv_metrics.kv_total_blocks,
kv_metrics.num_requests_waiting,
kv_metrics.gpu_cache_usage_perc,
kv_metrics.gpu_prefix_cache_hit_rate,
kv_metrics.data_parallel_rank)
# On cancellation, break the loop gracefully; otherwise log & continue
except asyncio.CancelledError:
break
except Exception:
logger.exception("Failed to receive or publish metrics")
def _get_bootstrap_info(self):
...
🧰 Tools
🪛 Ruff (0.11.9)

104-104: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)

🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1465/merge) by zixuanzhang226.

[error] 92-107: Pre-commit hook 'ruff' failed: 1 error found and fixed in this file related to code style or linting issues.

🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 90 to 107, remove the
unused exception variable 'e' to fix the linter error, add handling for
asyncio.CancelledError to allow the loop to exit cleanly on task cancellation,
and ensure the logger.exception call remains to keep the stack trace. Finally,
reformat the code with black to fix formatting issues.

@zixuanzhang226 zixuanzhang226 force-pushed the feat/receive-kvmetrics branch 2 times, most recently from c1da15e to edda9c7 Compare June 10, 2025 23:25
@ishandhanani
Copy link
Contributor

ishandhanani commented Jun 11, 2025

This is awesome! Thanks for making this and the associated SGLang PR. I'll tinker with it tomorrow and we can get it in!!

@zixuanzhang226
Copy link
Author

This is awesome! Thanks for making this and the associated SGLang PR. I'll tinker with it tomorrow and we can get it in!!

Hello @ishandhanani, could you kindly review and approve it when you have a chance? Thank you so much!

@ishandhanani
Copy link
Contributor

Hi @zixuanzhang226 - this PR looks ok overall. I want to get @tedzhouhk eyes on it as well. We should also get the SGL PR in as well. I'll ping their team

@ishandhanani ishandhanani mentioned this pull request Jun 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
external-contribution Pull request is from an external contributor feat size/S
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants