30
30
import socket
31
31
32
32
import sglang as sgl
33
+ import zmq
33
34
from components .decode_worker import SGLangDecodeWorker
34
- from sglang .srt .utils import get_ip
35
+ from sglang .srt .utils import get_ip , get_zmq_socket
35
36
from utils .protocol import DisaggPreprocessedRequest , PreprocessedRequest
36
37
from utils .sglang import parse_sglang_args
37
38
38
- from dynamo .llm import ModelType , register_llm
39
+ from dynamo .llm import ModelType , WorkerMetricsPublisher , register_llm
39
40
from dynamo .sdk import async_on_start , depends , dynamo_context , endpoint , service
40
41
41
42
logger = logging .getLogger (__name__ )
@@ -55,11 +56,17 @@ def __init__(self):
55
56
class_name = self .__class__ .__name__
56
57
self .engine_args = parse_sglang_args (class_name , "" )
57
58
self .engine = sgl .Engine (server_args = self .engine_args )
59
+ self .metrics_publisher = WorkerMetricsPublisher ()
58
60
59
61
logger .info ("SGLangWorker initialized" )
60
62
61
63
@async_on_start
62
64
async def async_init (self ):
65
+ context = zmq .asyncio .Context ()
66
+ self .receive_metrics_from_scheduler = get_zmq_socket (
67
+ context , zmq .PULL , self .engine .port_args .metrics_ipc_name , True
68
+ )
69
+ asyncio .create_task (self ._receive_and_publish_metrics_loop ())
63
70
runtime = dynamo_context ["runtime" ]
64
71
logger .info ("Registering LLM for discovery" )
65
72
comp_ns , comp_name = SGLangWorker .dynamo_address () # type: ignore
@@ -80,6 +87,24 @@ async def async_init(self):
80
87
.client ()
81
88
)
82
89
90
+ async def _receive_and_publish_metrics_loop (self ):
91
+ while True :
92
+ try :
93
+ kv_metrics = await self .receive_metrics_from_scheduler .recv_pyobj ()
94
+ self .metrics_publisher .publish (
95
+ kv_metrics .request_active_slots ,
96
+ kv_metrics .request_total_slots ,
97
+ kv_metrics .kv_active_blocks ,
98
+ kv_metrics .kv_total_blocks ,
99
+ kv_metrics .num_requests_waiting ,
100
+ kv_metrics .gpu_cache_usage_perc ,
101
+ kv_metrics .gpu_prefix_cache_hit_rate ,
102
+ kv_metrics .data_parallel_rank ,
103
+ )
104
+
105
+ except Exception :
106
+ logger .exception ("Failed to receive or publish metrics" )
107
+
83
108
def _get_bootstrap_info (self ):
84
109
"""
85
110
Bootstrap info is stored in the worker's tokenizer manager. We use it to
0 commit comments